Spiga

通信协议:MQTT

2026-04-04 13:00:10

一、MQTT通信

1. 工业互联数据交换协议

  • HTTP:短连接 长连接
  • CoAP (Constrained Application Protocol),受限应用协议,应用于无线传感网中协议。
  • MQTT (Message Queuing Telemetry Transport ),消息队列遥测传输,由 IBM 开发的即时通讯协议。
  • DDS(Data Distribution Service for Real-Time Systems),面向实时系统的数据分布服务。
  • AMQP(Advanced Message Queuing Protocol),先进消息队列协议 RabbitMQ
  • XMPP(Extensible Messaging and Presence Protocol)可扩展通讯和表示协议 XML
  • JMS (Java Message Service),即消息服务,这是JAVA平台中著名的消息队列协议

2. MQTT

  • 一种基于发布/订阅模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
  • 极少的数据完成远程通信

核心内容:

  • MQTT定义:轻量级发布/订阅消息协议,适用于低带宽、高延迟网络。
  • 核心特点:低开销、异步通信、支持QoS(0/1/2)。
  • 适用场景:物联网设备通信、工业设备监控(如MES中的设备状态上报)。

通信过程

  • 服务端/Broker
  • 客户端(App/Device)

通信概念:登录注册(Client ID)、订阅、主题、负载、发布、消息、服务质量

通信细节:

  • 基于TCP协议的应用层协议
  • 固定报头
  • 可变报头
  • 载荷内容

3. 固定报头

如:0010 0100

  • 消息类型(第一个字节的高4位)

  • 标志位(第一个字节的低4位)

  • 消息长度:长度扩展,最大长度256M数据

4. 可变报头

  • 连接标志

  • 报文标识符(2字节) ,发布、订阅、取消订阅

5. 通信载荷

6. MQTT主题过滤

  • 主题层级分隔符:“/”,用于将结构化引入主题名 A套/客厅/电视、A套/客厅/空调

  • 通配符: “#”、“+”、“$”,作用:订阅的主题过滤器可以包含特殊的通配符,允许你一次订阅多个主题。

    种类:

    • # 当前节点下的所有节点,包括父级

      例:A/# 包括A、A/B、A/B/C 异常:A/#/C无效

    • + 当前节点下的所有节点,单层匹配 (强制一级)

      例:A/+ 包括A/B、A/C、A/D 不匹配A、A/B/C、A/B/D/E

      有效:+、+/A/#、A/+/B、

      无效:A+

    • $ $SYS/ 一般应用层面不会使得,服务特定信息或控制接口主题前缀

​ 约定:由服务端使用

  • 主题规则
    • 所有的主题名和主题过滤器必须至少包含一个字符 。
    • 主题名和主题过滤器是区分大小写的。 A a 两个不同的主题
    • 主题名和主题过滤器可以包含空格。 Hello World
    • 主题名或主题过滤器以前置或后置斜杠 “/” 区分。 /A A/
    • 只包含斜杠 “/” 的主题名或主题过滤器是合法的。
    • 主题名和主题过滤器不能包含空字符 (Unicode U+0000)
    • 主题名和主题过滤器是UTF-8编码字符串,它们不能超过65535字节

7. MQTT通信问题整理

  • 如果收到非法的标志,接收者关闭网络连接。
  • SUBSCRIBE,UNSUBSCRIBE和PUBLISH(QoS大于0)控制报文必须包含一个非零的16位报文标识符(Packet Identifier)。QoS设置为0的PUBLISH报文不能包含报文标识符。
  • 客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT报文。在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接。
  • 服务端必须验证CONNECT控制报文的保留标志位(第0位)是否为0,如果不为0必须断开客户端连接。
  • 服务端必须允许1到23个字节长的UTF-8编码的客户端标识符,客户端标识符只能包含大写字母,小写字母和数字。

二、报文通信实例

1. 连接服务器

//发送连接请求
private static void Connect(Socket socket)
{
    // Connect握手-》发送ClientID   用户名  密码
    List<byte> bytes = new List<byte>();
    // 固定报头
    {
        byte byte1 = 1 << 4;// 直接写8?  发送      0011 1100
        bytes.Add(byte1);
        // 
        // byte byte2 = 0;// 暂时无法确定   一个字节最多表示255个长度
        // 特殊  超出255字节
        // 1000 0000    1000 0000    1000 0000       127        
        // 最高位  标记位  如果是1   表示后面一个字节也是长度值
        // 如果是123个字节    7B     200                        300  

        // 1Byte    0 -127     0x00-0x7F
        // 2Byte    128-16383  0x80 0x01  - 0xFF 0x7F   
    }
    List<byte> byte_list = new List<byte>();
    // 可变报头
    {
        // 协议信息
        string p_str = "MQTT";
        byte[] p_bytes = Encoding.ASCII.GetBytes(p_str);
        byte_list.Add((byte)(p_bytes.Length / 256));
        byte_list.Add((byte)(p_bytes.Length % 256));
        byte_list.AddRange(p_bytes);

        // 版本号  一个字节
        byte_list.Add(0x04);// 3.1.1版本协议,如果是5.0版本协议:0x05

        // 连接标志:一个字节
        {
            byte flag = 0;
            flag |= 128;// 将用户名标记置1
            flag |= 64;// 将密码标记置1

            // 遗嘱:
            // (场景:客户端A连接时,建立遗嘱;异常断线,遗嘱有效
            //         包含:主题名/数据   "offline"/"123")
            // 客户端B   订阅  offline
            // 客户端断线
            // 客户端B接收到A的遗嘱
            // 客户端C上线,并订阅了offline      
            flag |= 4;// 将使用遗嘱标记置1

            // 清理会话
            flag |= 2; // 将Clean Session标记置1

            byte_list.Add(flag);
        }
        // Keep Alive    秒:
        // 服务端如果要判断标准:
        // 1、指客户端两次通信间隔时间
        // 2、100 * 1.5 时间范围
        ushort second = 100;
        byte_list.Add((byte)(second / 256));
        byte_list.Add((byte)(second % 256));
    }
    // 负载信息
    {
        // Client ID
        string client_id = "liuyz";
        byte[] ci_bytes = Encoding.ASCII.GetBytes(client_id);
        byte_list.Add((byte)(ci_bytes.Length / 256));
        byte_list.Add((byte)(ci_bytes.Length % 256));
        byte_list.AddRange(ci_bytes);

        // 根据可变报头中的标记位,需要:
        // 添加遗嘱主题信息
        string will_topic_str = "offline";
        byte[] wt_bytes = Encoding.ASCII.GetBytes(will_topic_str);
        byte_list.Add((byte)(wt_bytes.Length / 256));
        byte_list.Add((byte)(wt_bytes.Length % 256));
        byte_list.AddRange(wt_bytes);
        // 添加遗嘱消息信息
        string will_msg_str = "AAA123";
        byte[] wm_bytes = Encoding.ASCII.GetBytes(will_msg_str);
        byte_list.Add((byte)(wm_bytes.Length / 256));
        byte_list.Add((byte)(wm_bytes.Length % 256));
        byte_list.AddRange(wm_bytes);

        // 添加用户名信息
        string user_str = "admin";
        byte[] u_bytes = Encoding.ASCII.GetBytes(user_str);
        byte_list.Add((byte)(u_bytes.Length / 256));
        byte_list.Add((byte)(u_bytes.Length % 256));
        byte_list.AddRange(u_bytes);
        // 添加密码信息
        string pwd_str = "123456";
        byte[] p_bytes = Encoding.ASCII.GetBytes(pwd_str);
        byte_list.Add((byte)(p_bytes.Length / 256));
        byte_list.Add((byte)(p_bytes.Length % 256));
        byte_list.AddRange(p_bytes);
    }

    // 拼接长度数据
    int len = byte_list.Count;
    byte[] len_bytes = new byte[] { (byte)byte_list.Count };
    bytes.AddRange(len_bytes);// 长度信息
    bytes.AddRange(byte_list);// 可变报头+载荷信息

    //socket.Send(bytes.ToArray());// 发送MQTT的请求连接报文 
    SendAndReceive(bytes.ToArray(), socket);
}
//辅助方法
static Random rnd = new Random();
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect("127.0.0.1", 1883);// 连接到服务器

static byte[] SendAndReceive(byte[] bytes, Socket socket)
{
    socket.Send(bytes);

    List<byte> byteList = new List<byte>();
    byte[] respBytes = new byte[2];// 固定报头
    socket.Receive(respBytes, 0, 2, SocketFlags.None);

    //byteList.AddRange(respBytes);
    //for (int i = 0; i < 3; i++)
    //{
    //    if ((byteList[byteList.Count - 1] & 0x80) != 0)
    //    {
    //        respBytes = new byte[1];// 固定报头
    //        socket.Receive(respBytes, 0, 1, SocketFlags.None);
    //        byteList.Add(respBytes[0]);
    //    }
    //    else
    //        break;
    //}
    //byte[] lenBytes = byteList.GetRange(1, byteList.Count - 1).ToArray();
    long len = respBytes[1];// LengthDecode(lenBytes);// 当前报文剩余字节的数量

    byte[] dataBytes = new byte[len];// 可变报头+有效载荷的字节
    socket.Receive(dataBytes, 0, dataBytes.Length, SocketFlags.None);
    byteList.AddRange(dataBytes);

    return byteList.ToArray();// 将完整响应报文返回 
}

// 从一个十进制数转换成字节
static byte[] LengthEncode(int len)
{
    List<byte> bytes = new List<byte>();
    ulong rc = 0;
    byte d;
    do
    {
        d = (byte)(len % 128);
        len /= 128;
        if (len > 0)
            d |= 128;
        bytes.Add(d);
    } while (len > 0);
    return bytes.ToArray();
}
// 根据字节转换成数字
static long LengthDecode(byte[] bytes)
{
    byte encodeByte;
    uint multiplier = 1;
    long rc = 0;
    int i = 0;
    do
    {
        encodeByte = bytes[i++];
        rc += (encodeByte & 127) * multiplier;
        if (multiplier > 128 * 128 * 128)
            break;
        else
            multiplier *= 128;
    } while ((encodeByte & 128) != 0);
    return rc;
}

//心跳
private static void Ping(Socket socket)
{
    Task.Run(async () =>
    {
        while (true)
        {
            await Task.Delay(1000);
            // 心跳请求,只有固定报头
            List<byte> ping_bytes = new List<byte>();
            byte byte1 = 12 << 4;// 消息类型  12
            ping_bytes.Add(byte1);
            ping_bytes.Add(0x00);

            socket.Send(ping_bytes.ToArray());
        }
    });
}

//断开连接
private static void Disconnect(Socket socket)
{
    List<byte> reqBytes = new List<byte>();
    byte byte1 = 14 << 4;
    reqBytes.Add(byte1);
    // 剩余字节长度
    reqBytes.Add(0);

    socket.Send(reqBytes.ToArray()); ;
}

2. 订阅

private static void Subscribte(Socket socket)
{
    List<byte> reqBytes = new List<byte>();

    // 固定报头
    byte byte1 = 8 << 4;
    // 位运算
    // 将8这个值左移4位?
    // 0000 1000   ->  1000 0000
    // 13 ->   1101 0000  
    byte1 |= 2;// 固定写法,字节的bit1置1  
    //  | 或运算    & 与运算
    // 1000 0000
    // 0000 0010
    // 1000 0010 -> 或运算结果
    reqBytes.Add(byte1);

    List<byte> bytes = new List<byte>();
    // 可变报头(报文标识符  2byte)
    ushort pid = (ushort)rnd.Next(100, 1000);
    bytes.Add((byte)(pid / 256));
    bytes.Add((byte)(pid % 256));

    // 载荷信息
    // 订阅主题名称
    string topic = "a/b";    // "a/#"
    byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
    bytes.Add((byte)(t_bytes.Length / 256));
    bytes.Add((byte)(t_bytes.Length % 256));
    bytes.AddRange(t_bytes);
    // QoS  服务质量(0,1,2)   不能超过2
    // 0:最多发送成功一次
    // 1:最少发送成功一次
    // 2:确保发送成功一次
    bytes.Add(1);

    topic = "a/d";
    t_bytes = Encoding.ASCII.GetBytes(topic);
    bytes.Add((byte)(t_bytes.Length / 256));
    bytes.Add((byte)(t_bytes.Length % 256));
    bytes.AddRange(t_bytes);
    // QoS 2
    bytes.Add(2);

    topic = "a/f";
    t_bytes = Encoding.ASCII.GetBytes(topic);
    bytes.Add((byte)(t_bytes.Length / 256));
    bytes.Add((byte)(t_bytes.Length % 256));
    bytes.AddRange(t_bytes);
    // QoS 0
    bytes.Add(0);
    
    // 添加报文长度   最大256M
    reqBytes.AddRange(LengthEncode(bytes.Count));
    reqBytes.AddRange(bytes);

    //socket.Send(reqBytes.ToArray());
    SendAndReceive(reqBytes.ToArray(), socket);
}

3. 取消订阅

private static void Unsubscribte(Socket socket)
{
    List<byte> reqBytes = new List<byte>();
    byte byte1 = 10 << 4;
    byte1 |= 2;// 固定写法,字节的bit1置1  
    reqBytes.Add(byte1);

    // 可变报头+有效载荷
    List<byte> bytes = new List<byte>();
    {
        // 可变报头(报文标识符  2byte)
        ushort pid = (ushort)rnd.Next(100, 1000);
        bytes.Add((byte)(pid / 256));
        bytes.Add((byte)(pid % 256));

        string topic = "a/b";
        byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
        bytes.Add((byte)(t_bytes.Length / 256));
        bytes.Add((byte)(t_bytes.Length % 256));
        bytes.AddRange(t_bytes);

        topic = "z/x";
        t_bytes = Encoding.ASCII.GetBytes(topic);
        bytes.Add((byte)(t_bytes.Length / 256));
        bytes.Add((byte)(t_bytes.Length % 256));
        bytes.AddRange(t_bytes);
    }

    reqBytes.AddRange(LengthEncode(bytes.Count));
    reqBytes.AddRange(bytes);

    SendAndReceive(reqBytes.ToArray(), socket);
}

4. 发布

  • QoS-0(可能对方收不到,最多只收一次)

private static void Publish_0(Socket socket)
{
    List<byte> reqBytes = new List<byte>();
    byte byte1 = 3 << 4;// 功能码
    // 如里需要服务端进行消息的保存,将bit0置1   RETAIN标记
    //byte1 |= 1;
    // 如果需要设备发存消息的QoS等级,设置bit1 - 2两个位,最大为2
    int qos = 0;
    byte1 |= (byte)(qos << 1);
    // 如果消息是重发的,设备bit3为1

    reqBytes.Add(byte1);

    // 可变报头+有效载荷
    List<byte> bytes = new List<byte>();
    {
        // 添加主题
        string topic = "t001";
        byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
        bytes.Add((byte)(t_bytes.Length / 256));
        bytes.Add((byte)(t_bytes.Length % 256));
        bytes.AddRange(t_bytes);

        // 添加消息内容
        string msg = "Hello MQTT - 0";
        byte[] m_bytes = Encoding.UTF8.GetBytes(msg);
        bytes.AddRange(m_bytes);
    }

    reqBytes.AddRange(LengthEncode(bytes.Count));
    reqBytes.AddRange(bytes);

    socket.Send(reqBytes.ToArray());
}
  • QoS-1(最少收一次)

private static void Publish_1(Socket socket)
{
    List<byte> reqBytes = new List<byte>();
    byte byte1 = 3 << 4;// 功能码
    // 如里需要服务端进行消息的保存,将bit0置1
    //byte1 |= 1;
    // 如果需要设备发存消息的QoS等级,设置bit1 - 2两个位,最大为2
    int qos = 1;
    byte1 |= (byte)(qos << 1);
    // 如果消息是重发的,设备bit3为1

    reqBytes.Add(byte1);

    // 可变报头+有效载荷
    List<byte> bytes = new List<byte>();
    {
        // 添加主题
        string topic = "t001";
        byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
        bytes.Add((byte)(t_bytes.Length / 256));
        bytes.Add((byte)(t_bytes.Length % 256));
        bytes.AddRange(t_bytes);

        // 添加PID
        ushort pid = (ushort)rnd.Next(100, 1000);
        bytes.Add((byte)(pid / 256));
        bytes.Add((byte)(pid % 256));

        // 添加消息内容
        string msg = "Hello MQTT - 1";
        byte[] m_bytes = Encoding.UTF8.GetBytes(msg);
        bytes.AddRange(m_bytes);
    }

    reqBytes.AddRange(LengthEncode(bytes.Count));
    reqBytes.AddRange(bytes);

    SendAndReceive(reqBytes.ToArray(), socket); // 接收PubAck的响应 功能码:4
}
  • QoS-2(保证消息发送并只接收一次)

private static void Publish_2(Socket socket)
{
    List<byte> reqBytes = new List<byte>();
    byte byte1 = 3 << 4;// 功能码
    // 如里需要服务端进行消息的保存,将bit0置1
    //byte1 |= 1;
    // 如果需要设备发存消息的QoS等级,设置bit1 - 2两个位,最大为2
    int qos = 2;
    byte1 |= (byte)(qos << 1);
    // 如果消息是重发的,设备bit3为1

    reqBytes.Add(byte1);
    ushort pid = (ushort)rnd.Next(100, 1000);

    // 可变报头+有效载荷
    List<byte> bytes = new List<byte>();
    {
        // 添加主题
        string topic = "t001";
        byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
        bytes.Add((byte)(t_bytes.Length / 256));
        bytes.Add((byte)(t_bytes.Length % 256));
        bytes.AddRange(t_bytes);

        // 添加PID
        bytes.Add((byte)(pid / 256));
        bytes.Add((byte)(pid % 256));

        // 添加消息内容
        string msg = "Hello MQTT - 2";
        byte[] m_bytes = Encoding.UTF8.GetBytes(msg);
        bytes.AddRange(m_bytes);
    }

    reqBytes.AddRange(LengthEncode(bytes.Count));
    reqBytes.AddRange(bytes);

    SendAndReceive(reqBytes.ToArray(), socket);// 接收PubRec响应报文   功能码:5

    // 发送PubRel报文 6
    {
        reqBytes = new List<byte>();
        byte1 = 6 << 4;// 功能码
        byte1 |= 2;
        reqBytes.Add(byte1);
        // 剩余字节长度  2
        reqBytes.Add(2);
        // 添加pid
        reqBytes.Add((byte)(pid / 256));
        reqBytes.Add((byte)(pid % 256));

        SendAndReceive(reqBytes.ToArray(), socket);// 接收PubComp响应报文   功能:7
    }
}
  • QoS及信息交互过程

    • QoS 0:最多一次的传输:定时监听点位(5秒获取)

    • QoS 1:至少一次的传输、至多无限次:停车场入场

    • QoS 2:有且仅有一次的传输:停车场出场 订单支付

三、第三方库(MQTTnet)的使用示例

1. 客户端

internal class Program
{
    static IMqttClient client = null;
    static void Main(string[] args)
    {
        Console.WriteLine("Hello, World!");

        ClientTest();

        Console.ReadLine();
    }

    static async void ClientTest()
    {
        // MQTT客户端
        client = new MqttClientFactory().CreateMqttClient();

        MqttClientOptions options = new MqttClientOptionsBuilder()
                .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)

                .WithTcpServer("127.0.0.1", 1883)
                .WithClientId("mqttnet_id")
                .WithCredentials("admin", "123456")

                .WithWillTopic("offline")
                .WithWillPayload("掉线喽")
                .WithWillQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)

                .Build();

        client.ConnectedAsync += Client_ConnectedAsync;
        client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
        client.DisconnectedAsync += Client_DisconnectedAsync;

        MqttClientConnectResult result = await client.ConnectAsync(options);

        if (result.ResultCode == MqttClientConnectResultCode.Success)
        {
            Task.Run(async() =>
            {
                while (client.IsConnected)
                {
                    await Task.Delay(5000);
                    await client.TryPingAsync();
                }
            });

            // 订阅
            // QoS - 2
            // 第一种方式
            // topic01/#  :  topic01/a   topic01/a/b/c/d/e/f
            await client.SubscribeAsync("topic01", MqttQualityOfServiceLevel.ExactlyOnce);

            // 第二种方式
            //MqttTopicFilter filter = new MqttTopicFilterBuilder()
            //    .WithTopic("topic11")
            //    .WithAtLeastOnceQoS()
            //    .Build();
            //await client.SubscribeAsync(filter);

            // 第三种方式
            //MqttClientSubscribeOptions options1 = new MqttClientSubscribeOptionsBuilder()
            //    .WithTopicFilter(filter)
            //    .Build();
            //await client.SubscribeAsync(options1);

            // 取消订阅
            //await client.UnsubscribeAsync("topic11");

            // 发布
            // QoS - 1
            MqttApplicationMessage message = new MqttApplicationMessageBuilder()
                .WithTopic("topic02")
                .WithPayload("Hello MQTTnet")
                .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                .Build();
            await client.PublishAsync(message);

            // 断开连接
            //await client.DisconnectAsync();
        }
    }

    private static Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
    {
        return Task.CompletedTask;
    }

    private static Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
    {
        // 获取相关的主题  、负载信息
        var id = arg.ClientId;
        var topic = arg.ApplicationMessage.Topic;
        var msg = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload.ToArray());

        return Task.CompletedTask;
    }

    private static Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
    {
        if (client == null) return Task.CompletedTask;

        // QoS - 2
        //MqttClientSubscribeResult result = client.SubscribeAsync("topic02", MqttQualityOfServiceLevel.ExactlyOnce).GetAwaiter().GetResult();

        return Task.CompletedTask;
    }
}

2. 服务器端

internal class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Hello, World!");

        Server();

        Console.ReadLine();
    }

    static async void Server()
    {
        MqttServerOptions options = new MqttServerOptionsBuilder()
            .WithDefaultEndpoint()// 127.0.0.1:1883
            .Build();

        MqttServer server = new MqttServerFactory().CreateMqttServer(options);
        // 服务器启动状态
        server.StartedAsync += Server_StartedAsync;
        // 登录验证
        server.ValidatingConnectionAsync += Server_ValidatingConnectionAsync;
        // 检查哪个客户端接入
        server.ClientConnectedAsync += Server_ClientConnectedAsync;
        server.ClientDisconnectedAsync += Server_ClientDisconnectedAsync;

        // 检查订阅信息
        server.ClientSubscribedTopicAsync += Server_ClientSubscribedTopicAsync;
        server.ClientUnsubscribedTopicAsync += Server_ClientUnsubscribedTopicAsync;

        // 检查消息
        server.ApplicationMessageEnqueuedOrDroppedAsync += Server_ApplicationMessageEnqueuedOrDroppedAsync;

        // 拦截
        server.InterceptingPublishAsync += Server_InterceptingPublishAsync;

        await server.StartAsync();

        Console.ReadLine();

        // 基于服务对象发布消息
        //server.InjectApplicationMessage("", "");
        MqttApplicationMessage msg = new MqttApplicationMessageBuilder()
            .WithTopic("topic02")
            .WithPayload("Hello  server!")
            .Build();
        await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(msg)
        {
            SenderClientId = "Server"
        });

        Console.ReadLine();

        // 强制客户端退出
        var client_list = (await server.GetClientsAsync());
        MqttClientStatus client = client_list.FirstOrDefault(c => c.Id == "aaaa");
        if (client != null)
            await client.DisconnectAsync();

        Console.ReadLine();
        // 停止服务
        await server.StopAsync();
    }

    private static Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
    {
        //arg.ApplicationMessage.Topic += "/sub002";

        return Task.CompletedTask;
    }

    private static Task Server_ApplicationMessageEnqueuedOrDroppedAsync(ApplicationMessageEnqueuedEventArgs arg)
    {
        Console.WriteLine($"转发消息:{arg.SenderClientId} - {arg.ReceiverClientId}");
        Console.WriteLine($"\t:{arg.ApplicationMessage.Topic} - {Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)} - {arg.ApplicationMessage.QualityOfServiceLevel}");
        //arg.SenderClientId
        //arg.ReceiverClientId
        //arg.ApplicationMessage.Topic
        //arg.ApplicationMessage.Payload  -- byte[] -> string
        return Task.CompletedTask;
    }

    private static Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
    {
        Console.WriteLine($"客户端取消订阅:{arg.ClientId} - {arg.TopicFilter}");
        //arg.TopicFilter
        return Task.CompletedTask;
    }

    private static Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
    {
        Console.WriteLine($"客户端订阅主题:{arg.ClientId} - {arg.TopicFilter.Topic}");
        //arg.TopicFilter.Topic
        return Task.CompletedTask;
    }

    private static Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
    {
        Console.WriteLine("客户端断开连接:" + arg.ClientId);
        //arg.ClientId
        return Task.CompletedTask;
    }

    private static Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg)
    {
        Console.WriteLine("客户端连接成功:" + arg.ClientId);
        //arg.ClientId
        //arg.UserName
        //arg.Password

        return Task.CompletedTask;
    }

    private static Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
    {
        Console.WriteLine("正在验证客户端登录:" + arg.ClientId);
        //arg.ClientId
        //arg.ClientCertificate
        //arg.UserName
        //arg.Password
        // 检查用户名、密码的正确性,ClientId    
        if (arg.UserName != "admin" || arg.Password != "123456")
        {
            arg.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
            Console.WriteLine("登录失败!");
        }

        return Task.CompletedTask;
    }

    private static Task Server_StartedAsync(EventArgs arg)
    {
        Console.WriteLine("服务启动成功!正在监听....");
        return Task.CompletedTask;
    }
}

3. WebSocket

internal class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Hello, World!");

        WebServer();

        Console.ReadLine();
    }

    static void WebServer()
    {
        var server = new SuperWebSocket.WebSocketServer();
        server.Setup(8989);

        server.Start();
        Console.WriteLine("WebSocket服务已启动!");

        server.NewMessageReceived += Server_NewMessageReceived;
        server.NewDataReceived += Server_NewDataReceived;
        //server.NewSessionConnected += Server_NewSessionConnected;
    }

    private static void Server_NewDataReceived(SuperWebSocket.WebSocketSession session, byte[] value)
    {

    }

    private static void Server_NewMessageReceived(SuperWebSocket.WebSocketSession session, string value)
    {
        session.Send("消息回复");
    }

    private static void Server_NewSessionConnected(SuperWebSocket.WebSocketSession session)
    {

        Console.WriteLine("有客户端接入:" + session.SessionID);
    }
}
  • SuperWebSocketNETServer 0.8.0
  • System.Configuration.ConfigurationManager 9.0.2

4. html

<div class="wrap" style="padding-top: 20px">
    <div>
        <div class="contact">
            <div class="contact-form">
                <div class="content">
                    <h2>MQTT功能测试</h2>
                </div>
                <div>
                    <span><label>服务器地址</label></span>
                    <span><input name="url" type="text" class="textbox" id="url"></span>
                </div>
                <div>
                    <span><label>客户端ID</label></span>
                    <span><input name="clientId" type="text" class="textbox" id="clientId"></span>
                </div>
                <div>
                    <span><label>用户名</label></span>
                    <span><input name="userName" type="text" class="textbox" id="userName"></span>
                </div>
                <div>
                    <span><label>密码</label></span>
                    <span><input name="password" type="text" class="textbox" id="password"> </input></span>
                </div>
                <div>
                    <span><input type="submit" class="" value="登录到服务器" id="btn_connnect"></span>
                </div>
            </div>
            <div class="contact-form">
                <div class="content">
                    <h2>订阅</h2>
                </div>
                <div>
                    <span><label>主题</label></span>
                    <span><input name="topic" type="text" class="textbox" id="topic"></span>
                </div>
                <div>
                    <span><label>QoS</label></span>
                    <span>
                        <select name="qos" class="select" id="qos">
                            <option value="0">0</option>
                            <option value="1">1</option>
                            <option value="2">2</option>
                        </select>

                    </span>
                </div>
                <div>
                    <span><input type="submit" class="" value="订阅" id="btn_sub"></span>
                </div>
            </div>
            <div class="contact-form">
                <div class="content">
                    <h2>消息发布</h2>
                </div>
                <div>
                    <span><label>主题</label></span>
                    <span><input name="pub_topic" type="text" class="textbox" id="pub_topic"></span>
                </div>
                <div>
                    <span><label>QoS</label></span>
                    <span>
                        <select name="qos" class="select" id="pub_qos">
                            <option value="0">0</option>
                            <option value="1">1</option>
                            <option value="2">2</option>
                        </select>

                    </span>
                </div>
                <div>
                    <span><label>Pyload</label></span>
                    <span><input name="pyload" type="text" class="textbox" id="pyload"></span>
                </div>
                <div>
                    <span><input type="submit" class="" value="发布" id="btn_pub"></span>
                </div>
            </div>
            <div class="clear"> </div>
        </div>
    </div>
</div>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script type="text/javascript">
    console.log(mqtt);

    let client = null;

    const btnConnect = document.getElementById("btn_connnect");
    btnConnect.addEventListener("click", connect);

    function connect() {
        const ele_url = document.getElementById("url");
        const ele_client_id = document.getElementById("clientId");
        const ele_username = document.getElementById("userName");
        const ele_password = document.getElementById("password");

        client = mqtt.connect(`ws://${ele_url.value}/`, {
            clientId: ele_client_id.value,
            username: ele_username.value,
            password: ele_password.value,
            clean: true
        });

        client.on("connect", () => {
            console.log("连接成功");
        });

        client.on("message", (topic, msg, packet) => {
            console.log("获取到的数据:", msg)
            console.log("数据对应订阅主题:", topic)
            console.log("获取到的数据包:", packet)
        });
    }

    const btnSubscript = document.getElementById("btn_sub");
    btnSubscript.addEventListener("click", subscribe);
    function subscribe() {
        var ele_topic = document.getElementById("topic");
        var ele_qos = document.getElementById("qos");

        // 单订阅
        client.subscribe(ele_topic.value, { qos: parseInt(ele_qos.value) }, (error, granted) => {
            if (error)
                console.log('订阅失败!', error);
            else
                console.log('订阅成功!', granted[0].topic);
        });

        // 多订阅
        //client.subscribe([{ topic: "aaa", qos=1 }, { topic: "bbb", qos: 0 }], (err, gra) => {

        //});
    }

    const btnPublish = document.getElementById("btn_pub");
    btnPublish.addEventListener("click", publish);
    function publish() {
        const ele_topic = document.getElementById("pub_topic");
        const ele_qos = document.getElementById("pub_qos");
        const ele_pyload = document.getElementById("pyload");

        client.publish(ele_topic.value, ele_pyload.value, { qos: parseInt(ele_qos.value) });
    }

</script>

四. MQTT5.0

1. 从3.1.1到5.0

  • 报文格式:属性字段
  • 通信模式:订阅/发布,请求/响应
  • 服务重定向
  • 认证:首先客户端在连接的时候带认证信息 添加认证过程 (0x0F) CONNACK
  • 用户属性:自定义数据格式 “prop:value”
  • 过期:Session过期(4字节 秒)、消息过期 Clean Session =1
  • 流量控制:接收最大值、最大报文长度 QoS>0

2. MQTT 5.0属性

3. MQTT 5.0报文格式

  • 连接请求部分报文格式

  • 订阅请求部分报文格式

  • 断开连接原因码