java实现MQTT协议客户端的接收

作者:子龙 发布时间: 2025-11-21 阅读量:29 评论数:0

一、MQtt简介

 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是IBM开发的一个物联网通讯协议,OASIS(结构化信息标准促进组织)已宣布MQTT协议作为其新兴的物联网消息传递协议的首选。在MQTT的官方网站上,定义MQTT是一种machine-to-machine (M2M)设备之间通信的物联网互联协议,是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

 MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是MQTT服务器,消息发布者可以同时是订阅者。

MQTT 的工作原理

MQTT 是基于发布-订阅模式的通信协议,由 MQTT 客户端通过主题(Topic)发布或订阅消息,通过 MQTT Broker 集中管理消息路由,并依据预设的服务质量等级(QoS)确保端到端消息传递可靠性。

MQTT 客户端

任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。

MQTT Broker

MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。

发布-订阅模式

发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。

下图展示了 MQTT 发布/订阅过程。温度传感器作为客户端连接到 MQTT Broker,并通过发布操作将温度数据发布到一个特定主题(例如 Temperature)。MQTT Broker 接收到该消息后会负责将其转发给订阅了相应主题(Temperature)的订阅者客户端。

MQTT 发布-订阅模式

主题

MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:

chat/room/1

sensor/10/temperature

sensor/+/temperature

MQTT 主题支持以下两种通配符:+#

  • +:表示单层通配符,例如 a/+ 匹配 a/xa/y
  • #:表示多层通配符,例如 a/# 匹配 a/xa/b/c/d

注意:通配符主题只能用于订阅,不能用于发布。

QoS

MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。

  • QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
  • QoS 1:消息至少传送一次。
  • QoS 2:消息只传送一次。

MQTT 的工作流程

在了解了 MQTT 的基本组件之后,让我们来看看它的一般工作流程:

  1. 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
  2. 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
  3. MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。

开始使用 MQTT

本次教程实现java订阅者这个角色。

引入依赖

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <grpc.version>1.6.1</grpc.version>
        <protobuf.version>3.21.12</protobuf.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version> <!-- 建议使用较新版本 -->
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>${grpc.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>${grpc.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>${grpc.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>3.21.12</version> <!-- 版本号应与你的 protobuf-java 保持一致 -->
        </dependency>
    </dependencies>


    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.5.0.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.0</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

创建实体

开发者 MQTT 客户端在经过 proto buffer 反序列化步骤之后将得到完整的 JSON

所以我们要通过 proto文件创建java实体类

.proto文件语法高亮显示

需要安装Protobuf Support插件

依次点击Intellij中的“File”-->"Settings"-->"Plugins",如下所示:

4758831C-2490-4B27-B35E-E7789C0CB754.png

安装完后,重启Intellij IDEA,查看.proto文件,会发现已经支持语法高亮显示。

将.proto文件转成Java类

一般的做法,是执行protoc命令,依次将.proto文件转成Java类:

protoc.exe -I=d:/tmp --java_out=d:/tmp d:/tmp/monitor_data.proto

不过gRPC官方推荐了一种更优雅的使用姿势,可以通过maven轻松搞定。

使用maven的编译命令,即可在target中看到根据.proto文件生成的Java类。

需要引入上面的pom依赖。

新建一个包 proto,包里新建sensor.proto文件。

通过maven编译命令后,在target目录下就生成了java实体类,再把它移动到src目录下。

A90079FA-A441-40C6-AB16-7A21946DD2EB.png

订阅主题

public class MqttClientExample {

    // --- 请根据你的实际情况修改以下配置 ---
    private static final String BROKER_URL = "ssl://ip:2883"; 
    private static final String USERNAME = "xxxx";
    private static final String PASSWORD = "xxxx";
    private static final String CA_CERT_PATH = "C:\\Users\\0000\\Downloads\\caCert.pem"; // RisingHF提供的CA证书
    private static final String CLIENT_ID = "JavaClient-" + System.currentTimeMillis(); // 确保客户端ID唯一
    private static final String TOPIC_TO_SUBSCRIBE = "user/500/device/8cf95720001797f6/uplink";
    private static final String TOPIC_TO_PUBLISH = "actuator/command";
    // ------------------------------------

    private static MqttClient mqttClient;

    public static void main(String[] args) {
        try {
            // 1. 创建 MQTT 客户端实例
            mqttClient = new MqttClient(BROKER_URL, CLIENT_ID, new MemoryPersistence());

            // 2. 设置连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(USERNAME);
            connOpts.setPassword(PASSWORD.toCharArray());

            // 设置是否在断开连接后清除会话
            connOpts.setCleanSession(true);

            // 如果是 SSL/TLS 连接 (broker URL 以 ssl:// 开头)
            if (BROKER_URL.startsWith("ssl")) {
                // 加载 CA 证书
                SSLContext sslContext = createSslContext(CA_CERT_PATH);
                connOpts.setSocketFactory(sslContext.getSocketFactory());
                System.out.println("SSL/TLS 已配置");
            }

            // 3. 设置回调函数
            // 这个回调函数将处理接收到的消息、连接状态变化等
            mqttClient.setCallback(new MqttCallback() {

                @Override
                public void connectionLost(Throwable cause) {
                    // 连接丢失时调用
                    System.out.println("连接丢失: " + cause.getMessage());
                    cause.printStackTrace();
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // 收到消息时调用
                    System.out.println("\n收到消息:");
                    System.out.println("  主题: " + topic);
                    byte[] payload = message.getPayload();
 
                    com.syjdly.Sensor.DeviceUplink sensorData = deserializeProtobuf(payload);

                    System.out.println("  内容: " + JsonFormat.printer().print(sensorData));

                    System.out.println("  QoS: " + message.getQos());
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 消息发布完成且收到确认时调用
                    try {
                        System.out.println("\n消息发布成功: " + token.getMessage());
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });

            // 4. 连接到 Broker
            System.out.println("正在连接到: " + BROKER_URL);
            mqttClient.connect(connOpts);
            System.out.println("连接成功!");

            // 5. 订阅主题
            System.out.println("正在订阅主题: " + TOPIC_TO_SUBSCRIBE);
            // QoS 0: 最多一次
            // QoS 1: 至少一次
            // QoS 2: 恰好一次
            mqttClient.subscribe(TOPIC_TO_SUBSCRIBE, 0);
            System.out.println("订阅成功!");

            // 保持主线程运行,以便接收消息
            // 在实际应用中,你可能会有一个更复杂的事件循环
            System.out.println("\n客户端正在运行,按 Enter 键退出...");
            System.in.read();

            // 7. 断开连接
            mqttClient.disconnect();
            System.out.println("已断开连接");

        } catch (MqttException me) {
            // 处理 MQTT 相关异常
            System.out.println("MQTT 异常:");
            System.out.println("  原因代码: " + me.getReasonCode());
            System.out.println("  消息: " + me.getMessage());
            System.out.println("  本地消息: " + me.getLocalizedMessage());
            System.out.println("  原因: " + me.getCause());
            me.printStackTrace();
        } catch (Exception e) {
            // 处理其他异常,如证书加载失败等
            e.printStackTrace();
        }
    }

    private static com.demo.Sensor.DeviceUplink deserializeProtobuf(byte[] payload) throws InvalidProtocolBufferException {

        return Sensor.DeviceUplink.parseFrom(payload); // 调用生成的 parseFrom 方法
    }

    /**
     * 创建并配置 SSLContext,用于加载 CA 证书
     */
    private static SSLContext createSslContext(String caCertPath) throws Exception {
        CertificateFactory cf = CertificateFactory.getInstance("X.509");
        X509Certificate caCert = (X509Certificate) cf.generateCertificate(new FileInputStream(caCertPath));

        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
        keyStore.load(null);
        keyStore.setCertificateEntry("caCert", caCert);

        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(keyStore);

        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        sslContext.init(null, tmf.getTrustManagers(), null);

        return sslContext;
    }

这样就完成了对mqtt broker 的订阅。

评论