# paho-mqtt-client **Repository Path**: durcframework/paho-mqtt-client ## Basic Information - **Project Name**: paho-mqtt-client - **Description**: 一个MQTT客户端,基于paho,可实现topic订阅发布,掉线重连等功能 - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 5 - **Forks**: 7 - **Created**: 2023-04-11 - **Last Updated**: 2024-08-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: mqtt, IOT ## README # paho-mqtt-client 一个MQTT客户端,基于paho ## 使用方式 maven依赖: ```xml net.oschina.durcframework paho-mqtt-client 1.0.0 ``` 编写代码 ```java PahoMqttClient mqttClient = PahoMqttClient.create() .broker(broker) .auth(username, password) .clientId(clientId) .cleanSession(true) // 自动重连 .automaticReconnect(true) // 订阅消息 .subscribe(topic, 2) // 设置回调,处理消息 .callback(new MyMqttCallback()) .connect(); public static class MyMqttCallback extends PahoMqttCallback { @Override public void connectionLost(Throwable throwable) { System.out.println("[client]失去连接:" + throwable.getMessage()); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("[client]收到消息: topic:" + topic + ", msg:" + mqttMessage.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } } ``` 发送消息 ```java mqttClient.publish(upLocationTopic, location.getBytes(), 1); ``` ## 测试用例 这里使用车联网作为测试场景,汽车终端设备每隔一段时间上报定位,IOT平台每隔一段时间下发OTA升级指令。 IOT平台实现功能如下: - 订阅定位上报topic,接收设备上报的定位数据 - 发送OTA消息到汽车终端 汽车终端设备实现功能如下: - 订阅点对点topic,接收平台端下发的指令 - 订阅定位上报topic,上报定位信息 ### IOT平台功能代码 ```java package com.gitee.mqttclient; import com.gitee.mqttclient.callback.PahoMqttCallback; import com.gitee.mqttclient.client.PahoMqttClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.junit.Test; import java.util.concurrent.TimeUnit; /** * 模拟平台端 * @author thc */ public class ServerTest { protected static String broker = "tcp://1.1.1.1:1883"; protected static String username = "s001"; protected static String password = "123456"; static PahoMqttClient mqttClient; /** * 启动平台端 * @throws InterruptedException * @throws MqttException */ @Test public void server() throws InterruptedException, MqttException { String clientId = "server-node-1"; // 平台端监听所有车型定位topic String topic = "prod/+/+/base/location/#"; mqttClient = PahoMqttClient.create() .broker(broker) .auth(username, password) .clientId(clientId) .cleanSession(true) // 自动重连 .automaticReconnect(true) // 订阅消息 .subscribe(topic, 1) .callback(new ServerMqttCallback()) .connect(); // 另起一个线程,进行指令下发 new Thread(() -> { // 每隔20秒对A000001这辆车进行OTA升级 String downTopic = "o2o/A000001/ota"; while (true) { String otaContent = String.valueOf(System.currentTimeMillis()); System.out.println("发送ota指令:" + otaContent); byte[] otaFile = otaContent.getBytes(); try { mqttClient.publish(downTopic, otaFile, 2); } catch (MqttException e) { throw new RuntimeException(e); } try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); TimeUnit.DAYS.sleep(1); } /** * 服务端接收消息回调 */ public static class ServerMqttCallback extends PahoMqttCallback { @Override public void connectionLost(Throwable throwable) { System.out.println("[server]失去连接:" + throwable.getMessage()); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("[server]收到消息: topic:" + topic + ", msg:" + mqttMessage.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } } } ``` ### 汽车终端功能代码 ```java package com.gitee.mqttclient; import com.gitee.mqttclient.callback.PahoMqttCallback; import com.gitee.mqttclient.client.PahoMqttClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.junit.Test; import java.util.Random; import java.util.concurrent.TimeUnit; /** * 模拟设备端,汽车终端 * @author thc */ public class ClientTest { protected static String broker = "tcp://1.1.1.1:1883"; protected static String username = "u001"; protected static String password = "123456"; // 汽车类型 static String carType = "suv"; // 车架号 static String vin = "A000001"; // 格式:client-用户名-车架号 static String clientId = "client-" + username + "-" + vin; private static PahoMqttClient mqttClient; private void init() throws MqttException { // 订阅点对点消息 String topic = "o2o/" + vin + "/+"; mqttClient = PahoMqttClient.create() .broker(broker) .auth(username, password) .clientId(clientId) .cleanSession(true) // 自动重连 .automaticReconnect(true) // 订阅消息 .subscribe(topic, 2) .callback(new MyMqttCallback()) .connect(); } @Test public void device() throws InterruptedException { try { init(); } catch (MqttException me) { System.err.println("MQTT连接失败"); System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); return; } // 另起一个线程,上报信息 new Thread(() -> { // 上报定位topic String upLocationTopic = String.format("prod/%s/%s/base/location", carType, vin); // 每隔10秒上报一次定位信息 while (true) { try { // 随机经纬度 String lon = "120.123" + new Random().nextInt(100); String lat = "30.123" + new Random().nextInt(100); String location = lon + "," + lat; System.out.println("上报定位信息:" + location); // 上报经纬度 mqttClient.publish(upLocationTopic, location.getBytes(), 1); } catch (MqttException e) { throw new RuntimeException(e); } try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); // stop here TimeUnit.DAYS.sleep(1); } public static class MyMqttCallback extends PahoMqttCallback { @Override public void connectionLost(Throwable throwable) { System.out.println("[client]失去连接:" + throwable.getMessage()); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("[client]收到消息: topic:" + topic + ", msg:" + mqttMessage.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } } } ``` broker, username, password改成自己的 先执行平台端测试用例,在执行设备端测试用例。 完整代码参见项目中test包