mqttnet消息推送与接收

xiaoxiao2021-02-28  23

创建windows服务网上有很多,不多述;

服务端做好后一定要写bat安装卸载文件

install.bat

@echo.请稍等,MqttNetServiceAddUserAndPassword服务安装启动中............@echo off@title 安装windows服务:MqttNetServiceAddUserAndPassword@sc create MqttNetServiceAddUserAndPassword binPath="%~dp0\MqttNetServiceAddUserAndPassword.exe"@sc config MqttNetServiceAddUserAndPassword start= auto@sc start MqttNetServiceAddUserAndPassword@echo.MqttNetServiceAddUserAndPassword启动完毕pause

//binPath="%~dp0\MqttNetServiceAddUserAndPassword.exe"   当前路径,也可指定

delete.bat

@echo.服务MqttNetServiceAddUserAndPassword卸载中..........@echo off@sc stop MqttNetServiceAddUserAndPassword@sc delete MqttNetServiceAddUserAndPassword@echo off@echo.MqttNetServiceAddUserAndPassword卸载完毕@pause

服务端:

using MQTTnet;using MQTTnet.Protocol;using MQTTnet.Server;using Newtonsoft.Json;using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Diagnostics;using System.IO;using System.Linq;using System.Net;using System.Net.Sockets;using System.ServiceProcess;using System.Text;using System.Threading;using System.Threading.Tasks;using System.Timers;namespace MqttNetServiceAddUserAndPassword{    public partial class Service1 : ServiceBase    {        private readonly static object locker = new object();        private MqttServer mqttServer = null;        private System.Timers.Timer timer = null;        private GodSharp.Sockets.SocketServer socketService = null;         //此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多2000个清零;        private List<string> subClientIDs = new List<string>();        public Service1()        {            InitializeComponent();            //创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中            timer = new System.Timers.Timer();            timer.AutoReset = true;            timer.Enabled = true;            timer.Interval = 5000;            timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);        }        protected override void OnStart(string[] args)        {            //开启服务            //CreateMQTTServer();            Task.Run(CreateMQTTServer);            if (timer.Enabled == false)            {                timer.Enabled = true;                timer.Start();            }            //创建socket服务端            //CreateServerSocket();        //    SocketServer.StartSocketService();        }        protected override void OnStop()        {            if (timer.Enabled == true)            {                timer.Enabled = false;                timer.Stop();            }        }        /// <summary>        /// 开启服务        /// </summary>        private async Task CreateMQTTServer()        {            if (mqttServer == null)            {                var optionsBuilder = new MqttServerOptionsBuilder();                optionsBuilder.WithConnectionValidator(c =>                {                    if (c.ClientId.Length < 5 || !c.ClientId.StartsWith("Eohi_"))                    {                        c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;                        return;                    }                    if (c.Username != "user" || c.Password != "123456")                    {                        c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;                        return;                    }                    c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;                });                //指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。                //options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(""))                //指定端口                optionsBuilder.WithDefaultEndpointPort(1884);                //连接记录数,默认 一般为2000                //optionsBuilder.WithConnectionBacklog(2000);                mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;                string msg = null;                             //将发送的消息加到日志                                      mqttServer.ApplicationMessageReceived += (s, e) =>                {                    msg = @"发送消息的客户端id:" + e.ClientId + "\r\n"                  + "发送时间:" + DateTime.Now + "\r\n"                  + "发送消息的主题:" + e.ApplicationMessage.Topic + "\r\n"                 + "发送的消息内容:" + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + "\r\n"                 + "--------------------------------------------------\r\n"                 ;                    WriteMsgLog(msg);                };                await mqttServer.StartAsync(optionsBuilder.Build());            }        }        #region 记录日志          /// <summary>          /// 消息记录日志          /// </summary>          /// <param name="msg"></param>          private void WriteMsgLog(string msg)        {            //string path = @"C:\log.txt";              //该日志文件会存在windows服务程序目录下              string path = AppDomain.CurrentDomain.BaseDirectory + "\\Msglog.txt";            FileInfo file = new FileInfo(path);            if (!file.Exists)            {                FileStream fs;                fs = File.Create(path);                fs.Close();            }            using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))            {                using (StreamWriter sw = new StreamWriter(fs))                {                    sw.WriteLine(DateTime.Now.ToString() + "   " + msg);                }            }        }        private void PubMessage(string topic, string msg)        {            if (mqttServer != null)            {                lock (locker)                {                    var message = new MqttApplicationMessageBuilder();                    message.WithTopic(topic);                    message.WithPayload(msg);                    mqttServer.PublishAsync(message.Build());                }            }        }        /// <summary>        ///客户端链接日志           客户端接入        /// </summary>        /// <param name="msg"></param>        private void WriteClientLinkLog(string msg)        {            //string path = @"C:\log.txt";              //该日志文件会存在windows服务程序目录下              string path = AppDomain.CurrentDomain.BaseDirectory + "\\ClientLinklog.txt";            FileInfo file = new FileInfo(path);            if (!file.Exists)            {                FileStream fs;                fs = File.Create(path);                fs.Close();            }            using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))            {                using (StreamWriter sw = new StreamWriter(fs))                {                    sw.WriteLine(msg);                }            }        }        /// <summary>        /// 通过定时器将客户端链接信息写入日志              /// </summary>        /// <param name="sender"></param>        /// <param name="e"></param>        private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)        {            // List<SetServiceM> dic = new List<SetServiceM>();               if (mqttServer != null)            {                List<ConnectedMqttClient> subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();                if (subclients.Count > 0)                {                    string subclientcount = @"客户端接入的总数为:" + (subclients.Count - 1).ToString() + "\r\n"                        + "------------------------------------------------------- \r\n";                    WriteClientLinkLog(subclientcount);                    PubMessage("ClientsCount", (subclients.Count - 1).ToString());                    List<string> clientids = new List<string>();                    //连接客户端的个数                    //   dic.Add(SetServiceM.SetService( "ClientCount", subclients.Count.ToString()));                    //   var dicclientlink = new Dictionary<string, string>();                    foreach (var item in subclients)                    {                        if (!subClientIDs.Contains(item.ClientId))                        {                            subClientIDs.Add(item.ClientId);                            string msg = @"连接客户端ID:" + item.ClientId + "\r\n"                            + "连接时间:" + DateTime.Now + "\r\n"                            + "协议版本:" + item.ProtocolVersion + "\r\n"                            + "最后收到的非保持活包:" + item.LastNonKeepAlivePacketReceived + "\r\n"                            + "最后收到的包:" + item.LastPacketReceived + "\r\n"                            + "挂起的应用程序消息:" + item.PendingApplicationMessages + "\r\n"                            + "------------------------------------------------" + "\r\n";                            WriteClientLinkLog(msg);                            PubMessage("clientlink", msg);                            //    mqttServer.PublishAsync("clientlink", msg);                            //    dicclientlink.Add(item.ClientId, msg);                        }                        clientids.Add(item.ClientId);                    }                    if (subClientIDs.Count >= 2000)                    {                        subClientIDs.Clear();                    }                    var exceptlist = subClientIDs.Except(clientids).ToList();                    //  var dicclientoutline = new Dictionary<string, string>();                    if (exceptlist.Count > 0)                    {                        exceptlist.ForEach(u =>                        {                            string msgoutline = @"客户端下线ID:" + u + "\r\n"                       + "客户端下线时间:" + DateTime.Now.ToString() + "\r\n"                       + "------------------------------------------------------------ \r\n"                       ;                            WriteClientLinkLog(msgoutline);                            subClientIDs.Remove(u);                            PubMessage("clientlink", msgoutline);                        //     mqttServer.PublishAsync("clientlink", msgoutline);                        // dicclientoutline.Add("OutLineID_" + u, msgoutline);                    });                    }                    连接客户端的id                    //dic.Add(SetServiceM.SetService("clientlink", JsonConvert.SerializeObject(dicclientlink)));                    客户端下线的时间                    //dic.Add(SetServiceM.SetService("clientoutline", JsonConvert.SerializeObject(dicclientoutline)));                    //SocketServer.connection.Send(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dic)));                }                else                {                    string subclientcount = @"暂无客户端接入!" + "\r\n"                     + "-------------------------------------------------------- \r\n";                    WriteClientLinkLog(subclientcount);                }            }        }        /// <summary>        /// 客户端下线时间        /// </summary>        /// <param name="msg"></param>        public void WriteClientOutLineLog(string msg)        {            string path = AppDomain.CurrentDomain.BaseDirectory + "\\ClientOutLineLog.txt";            FileInfo file = new FileInfo(path);            if (!file.Exists)            {                FileStream fs = File.Create(path);                fs.Close();            }            using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))            {                using (StreamWriter sw = new StreamWriter(fs))                {                    sw.WriteLine(msg);                }            }        }        //windows服务里的服务端        private void CreateServerSocket()        {            if (socketService == null)            {                // IPEndPoint ipep = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);                socketService = new GodSharp.Sockets.SocketServer("127.0.0.1", 9001, ProtocolType.Tcp);  //Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);                socketService.Start();                socketService.Listen(10);                Thread thread = new Thread(new ThreadStart(new Action(() =>                {                    while (true)                    {                    //  socketClient = socketService.Clients[0];                    // string data = "sql|" ; //在这里封装数据,通常是自己定义一种数据结构,如struct data{sql;result}                    // client.Send(Encoding.Default.GetBytes(msg));                }                })));            }            else            {                CreateServerSocket();            }        }        #endregion    }

}

服务端桌面显示程序:

using MQTTnet;using MQTTnet.Client;using MQTTnet.Protocol;using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Diagnostics;using System.Drawing;using System.Linq;using System.Net;using System.Net.NetworkInformation;using System.Net.Sockets;using System.ServiceProcess;using System.Text;using System.Text.RegularExpressions;using System.Threading;using System.Threading.Tasks;using System.Windows.Forms;namespace MQTTNetFrm{    public partial class Form1 : Form    {        private ServiceController ServiceController = null;        private MqttClientOptions options = null;        private MqttClient mqttClient = null;        public Form1()        {            InitializeComponent();               new Thread(new ThreadStart(GetServiceStatus)).Start();          Task.Run(LinkClientService).Wait();        }        /// <summary>        /// 获取当前ip地址        /// </summary>        /// <returns></returns>        private  string GetLocalIP()        {            string ip = null;          var iplist = Dns.GetHostAddresses(Dns.GetHostName()).DefaultIfEmpty().ToList();            iplist.ForEach(u =>            {                if (u.AddressFamily == AddressFamily.InterNetwork)                    ip= u.ToString();            });            return ip;        }        private async Task LinkClientService()        {            var m = "Eohi_Frm_" + Guid.NewGuid().ToString();            options = new MqttClientOptions            {                ClientId = m,                CleanSession = true,                ChannelOptions = new MqttClientTcpOptions                {                    Server = GetLocalIP(),                    Port = 1884,                },                Credentials = new MqttClientCredentials()                {                    Username = "user",                    Password = "123456"                }            };            var factory = new MqttFactory();            mqttClient = factory.CreateMqttClient() as MqttClient;            try            {                await mqttClient.ConnectAsync(options);                but_submsg_Click();                this.Invoke(new Action(() => { lab_serverstatus.Text = "连接正常,服务运行中............"; }));            }            catch (Exception ex)            {            }        }        private async void but_submsg_Click()        {            if (mqttClient != null)            {                await mqttClient.SubscribeAsync(new TopicFilter("ClientsCount", MqttQualityOfServiceLevel.AtMostOnce));                await mqttClient.SubscribeAsync(new TopicFilter("clientlink", MqttQualityOfServiceLevel.AtMostOnce));                await mqttClient.SubscribeAsync(new TopicFilter("msglog", MqttQualityOfServiceLevel.AtMostOnce));                mqttClient.ApplicationMessageReceived += (s, e) =>                {                    this.Invoke(new Action(() =>                    {                        var msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);                        if (msg.Length<=5)                        {                            lab_clientcount.Text = msg;                        }                        if (msg.Length>10)                        {                            if (msg.StartsWith("连接")    )                                rtb_clientlog.AppendText(msg);                            rtb_msglog.AppendText(msg);                        }                       }));                };            }        }        private void GetServiceStatus()        {            ServiceController[] serviceControllers = ServiceController.GetServices();            if (serviceControllers.Length > 0)            {                serviceControllers.ToList().ForEach(u =>                {                    if (u.DisplayName == "MqttNetServiceAddUserAndPassword")                    {                        if (ServiceController == null)                        {                            ServiceController = u;                        }                        if (u.Status == ServiceControllerStatus.Running)                        {                            lab_serverstatus.Text = "服务运行中............";                        }                        else                        {                            lab_serverstatus.Text = "服务已停止............";                        }                    }                });            }        }        private void button2_Click(object sender, EventArgs e)        {            if (tabControl1.SelectedTab == tabPage1)            {                rtb_clientlog.Text = "";            }            else            {                rtb_msglog.Text = "";            }        }        private void Form1_Load(object sender, EventArgs e)        {        }    }

}

客户端:

using MQTTnet;using MQTTnet.Client;using MQTTnet.Protocol;using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Drawing;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;using System.Timers;using System.Windows.Forms;namespace MqttClientTest01{    public partial class Form1 : Form    {        private MqttClient mqttClient = null;        private System.Timers.Timer timer = null;        private int CountLink = 0;        private MqttClientOptions options = null;        public Form1()        {            InitializeComponent();            创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中            //timer = new System.Timers.Timer();            //timer.AutoReset = true;            //timer.Interval = 1000;            //timer.Elapsed += new ElapsedEventHandler(LinkMqttNetService);        }        private void LinkMqttNetService(object sender, ElapsedEventArgs e)        {            if (mqttClient == null)            {                //   RunAsync();                CountLink++;            }            if (CountLink >= 5)            {                MessageBox.Show("连接多次失败,请确认各参数是否正确!");                CountLink = 0;                timer.Enabled = false;            }        }        private void but_linkserver_Click(object sender, EventArgs k)        {            LinkClientService();            //CountLink = 0;            //timer.Enabled = true;            //timer.Start();        }        /// <summary>        /// 链接客户端        /// </summary>        public async  void LinkClientService()        {            var m = "Eohi_" + Guid.NewGuid().ToString();            options = new MqttClientOptions            {                ClientId = m,                CleanSession = true,                ChannelOptions = new MqttClientTcpOptions                {                    Server = txtb_serverip.Text.Trim(),                    Port = Convert.ToInt32(txtb_serverport.Text.Trim()),                },                Credentials = new MqttClientCredentials()                {                    Username = tb_username.Text,                    Password = tb_userpwd.Text                }            };            var factory = new MqttFactory();            mqttClient =  factory.CreateMqttClient() as MqttClient;            try            {                await mqttClient.ConnectAsync(options);                this.Invoke(new Action(() =>                {                    lab_linkstatus.Text = "连接成功!";                    lab_linktimer.Text = DateTime.Now.ToString();                }));                mqttClient.Disconnected += async (s, e) =>                {                    if (e.ClientWasConnected==false)                    {                        try                        {                            await mqttClient.ConnectAsync(options);                            this.Invoke(new Action(() =>                            {                                lab_linkstatus.Text = "连接成功!";                                lab_linktimer.Text = DateTime.Now.ToString();                            }));                        }                        catch (Exception ex)                        {                            lab_linkstatus.Text = "连接失败!"+ex.Message;                            lab_linktimer.Text = DateTime.Now.ToString();                        }                    }                };            }            catch (Exception ex)            {                lab_linkstatus.Text = "连接失败!请检查ip/端口" ;                lab_linktimer.Text = DateTime.Now.ToString();            }         }        private void tb_username_TextChanged(object sender, EventArgs e)        {        }        private void but_clientsend_Click(object sender, EventArgs e)        {            if (mqttClient != null)            {                var message = new MqttApplicationMessageBuilder();                message.WithTopic(txtb_msgtopic.Text.Trim());                message.WithPayload(rtb_pubmsg.Text.Trim());                message.WithExactlyOnceQoS();                message.WithRetainFlag();                mqttClient.PublishAsync(message.Build());            }        }        private async void but_submsg_Click(object sender, EventArgs k)        {            if (mqttClient != null)            {                await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce));                mqttClient.ApplicationMessageReceived += (s, e) =>                {                    this.Invoke(new Action(() =>                    {                        rtb_submsgclient.AppendText("ClientID=" + e.ClientId + "\n");                        rtb_submsgclient.AppendText($"+ Topic = {e.ApplicationMessage.Topic}" + "\n");                        rtb_submsgclient.AppendText($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload) + "\n"}");                        rtb_submsgclient.AppendText($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}" + "\n");                        rtb_submsgclient.AppendText($"+ Retain = {e.ApplicationMessage.Retain}" + "\n");                    }));                };            }        }        private void button1_Click(object sender, EventArgs e)        {            rtb_submsgclient.Text = "";        }    }}

转载请注明原文地址: https://www.6miu.com/read-2630465.html

最新回复(0)