diff --git a/README.md b/README.md index 28785066cca0ff87bdf2f04fa243889d58dfe1c9..5a792292a370002cfc22c0789505b58285826d12 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,16 @@ ### 物联网网络中间件 iot-ucy是使用java语言且基于netty, spring boot, redis等开源项目开发来的物联网网络中间件, 支持udp, tcp, 串口(com)通讯(window、linux、mac)等底层协议和http, mqtt, websocket(默认实现和自定义协议头实现), modbus(tcp,rtu),plc,dtu(支持心跳,设备注册功能以及AT协议和自定义协议支持),dtu for modbus tcp,dtu for modbus rtu组件适配 等上层协议. 主打工业物联网底层网络交互、设备管理、数据存储、大数据处理. (其中plc包括西门子S7系列,欧姆龙Fins). 数据存储将使用taos数据库以及redis消息队列 #### 加入社区 -qq群:616124620 +QQ3群 - 272518000
+QQ2群 - 616124620 (已满)
+QQ1群 - 552167793 (已满)
+#### 商务合作(qq号:97235681) #### 如果您觉得的还可以点个star让更多开发者了解此项目 - [框架使用教程](http://doc.iteaj.com/) - [演示地址(支持DTU、PLC、MODBUS、串口等调试)](https://iot.iteaj.com/#/login) - [后端项目仓库(springboot)](https://gitee.com/iteaj/iboot) - [前端项目仓库(vue3+antdv3+vite2)](https://gitee.com/iteaj/ivzone) -#### 商务合作(qq号:97235681) + #### 已实现功能 1. 西门子和欧姆龙PLC原生协议适配 2. Modbus Tcp协议客户端实现 @@ -21,6 +24,14 @@ qq群:616124620 10. 支持在任意的java环境使用不强制依赖spring框架(V3.0.0+) 11. 支持动态启用和停用组件服务(V3.0.0+) 12. 支持串口+Modbus Rtu(V3.0.0+) +#### [基于此中间件实现的物联网网关](https://iot.iteaj.com/#/login)([源码](https://gitee.com/iteaj/iboot)) +![产品页面](https://iot.iteaj.com/show/product.png) +![联动页面](https://iot.iteaj.com/show/linkage.png) +![mqtt协议页面](https://iot.iteaj.com/show/mqtt.png) +![告警页面](https://iot.iteaj.com/show/warn.png) +![网络页面](https://iot.iteaj.com/show/network.png) +![地图页面](https://iot.iteaj.com/show/map.png) +![调试页面](https://iot.iteaj.com/show/debug.png) #### 接口主要特性 - 支持服务端启动监听多个端口, 统一所有协议可使用的api接口 - 包含一套代理客户端通信协议,支持调用:客户端 -> 服务端 -> 设备 -> 服务端 -> 客户端 diff --git a/iot-boot-starter/src/main/resources/META-INF/spring.factories b/iot-boot-starter/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 26dd900750e3f355faa9391f6f5969102ea81b09..0000000000000000000000000000000000000000 --- a/iot-boot-starter/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,13 +0,0 @@ -# application listener - -# Auto Configure -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -com.iteaj.iot.boot.IotApplication, \ -com.iteaj.iot.boot.autoconfigure.IotClientConfiguration, \ -com.iteaj.iot.boot.autoconfigure.IotServerConfiguration, \ -com.iteaj.iot.boot.core.IotCoreConfiguration,\ -com.iteaj.iot.boot.autoconfigure.IotMqttAutoConfiguration,\ -com.iteaj.iot.boot.autoconfigure.IotTaosAutoConfiguration,\ -com.iteaj.iot.boot.autoconfigure.IotRedisAutoConfiguration,\ -com.iteaj.iot.boot.autoconfigure.IotRdbmsAutoConfiguration - diff --git a/iot-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/iot-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000000000000000000000000000000000000..25b1e37e00e1e2224972371db06697cd15c15843 --- /dev/null +++ b/iot-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1,8 @@ +com.iteaj.iot.boot.IotApplication +com.iteaj.iot.boot.autoconfigure.IotClientConfiguration +com.iteaj.iot.boot.autoconfigure.IotServerConfiguration +com.iteaj.iot.boot.core.IotCoreConfiguration +com.iteaj.iot.boot.autoconfigure.IotMqttAutoConfiguration +com.iteaj.iot.boot.autoconfigure.IotTaosAutoConfiguration +com.iteaj.iot.boot.autoconfigure.IotRedisAutoConfiguration +com.iteaj.iot.boot.autoconfigure.IotRdbmsAutoConfiguration \ No newline at end of file diff --git a/iot-client/src/main/java/com/iteaj/iot/client/contants/HttpConstants.java b/iot-client/src/main/java/com/iteaj/iot/client/contants/HttpConstants.java new file mode 100644 index 0000000000000000000000000000000000000000..d8eeded5d4ac7511b165937b749a9812785880c6 --- /dev/null +++ b/iot-client/src/main/java/com/iteaj/iot/client/contants/HttpConstants.java @@ -0,0 +1,16 @@ +package com.iteaj.iot.client.contants; + +/** + * packageName com.iteaj.iot.client.contants + * + * @author whcodec + * @date 2024/12/7 + */ +public class HttpConstants { + + private HttpConstants() { + } + + public static final String HTTP_URL_NULL_ERROR = "请求url必填"; + +} diff --git a/iot-client/src/main/java/com/iteaj/iot/client/http/okhttp/OkHttpClient.java b/iot-client/src/main/java/com/iteaj/iot/client/http/okhttp/OkHttpClient.java index 53e59511dc75cd5fc8e7462cf34a20de4ba8ca7e..58f8a5edb338ddaf14d82108f1ae58153cd88f62 100644 --- a/iot-client/src/main/java/com/iteaj/iot/client/http/okhttp/OkHttpClient.java +++ b/iot-client/src/main/java/com/iteaj/iot/client/http/okhttp/OkHttpClient.java @@ -1,18 +1,21 @@ package com.iteaj.iot.client.http.okhttp; -import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.text.CharSequenceUtil; +import com.iteaj.iot.client.contants.HttpConstants; import com.iteaj.iot.client.http.*; import okhttp3.*; import java.io.IOException; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** * http客户端 + * * @see okhttp3.OkHttpClient */ public class OkHttpClient extends HttpClient { @@ -23,51 +26,45 @@ public class OkHttpClient extends HttpClient { super(properties, component); } - @Override public void get(HttpClientMessage requestMessage) { String url = requestMessage.getUrl(); - if(StrUtil.isBlank(url)) { - throw new HttpProtocolException("请求url必填"); + if (CharSequenceUtil.isBlank(url)) { + throw new HttpProtocolException(HttpConstants.HTTP_URL_NULL_ERROR); } Response response = null; try { // 构建请求的url HttpUrl httpUrl = buildRequestUrl(url, requestMessage.getQueryParam()); - Request.Builder builder = new Request.Builder().get().url(httpUrl); + // 构建请求头 handleRequestHeader(requestMessage, builder); - response = httpInvoke(null, client.newCall(builder.build()), requestMessage); - ResponseBody responseBody = response.body(); - - requestMessage.build(response.code(), responseBody.string(), responseBody.contentType().type(), response.message()); + requestMessage.build(response.code(), getResponseBodyStr(response), getContentType(response), response.message()); } catch (IOException e) { throw new HttpProtocolException(e.getMessage(), e); } finally { - if(response != null) { - response.body().close(); + if (response != null) { + Objects.requireNonNull(response.body()).close(); } } } private void handleRequestHeader(HttpClientMessage requestMessage, Request.Builder builder) { Map heads = requestMessage.getHeads(); - if(CollectionUtil.isNotEmpty(heads)) { - heads.forEach((key, val) -> { - builder.addHeader(key, val); - }); + if (CollUtil.isNotEmpty(heads)) { + heads.forEach(builder::addHeader); } } @Override public void get(HttpClientMessage requestMessage, Consumer handle) { String url = requestMessage.getUrl(); - if(StrUtil.isBlank(url)) { - throw new HttpProtocolException("请求url必填"); + if (CharSequenceUtil.isBlank(url)) { + throw new HttpProtocolException(HttpConstants.HTTP_URL_NULL_ERROR); } - if(null == handle) { + if (null == handle) { throw new HttpProtocolException("未指定异步协议处理器: [ProtocolHandle]"); } @@ -88,10 +85,8 @@ public class OkHttpClient extends HttpClient { private HttpUrl buildRequestUrl(String url, Map queryParam) { HttpUrl.Builder builder = HttpUrl.get(url).newBuilder(); - if(queryParam != null) { - queryParam.forEach((key, value) -> { - builder.addQueryParameter(key, (String) value); - }); + if (queryParam != null) { + queryParam.forEach((key, value) -> builder.addQueryParameter(key, (String) value)); } return builder.build(); } @@ -99,25 +94,23 @@ public class OkHttpClient extends HttpClient { @Override public void post(HttpClientMessage requestMessage) { String url = requestMessage.getUrl(); - if(StrUtil.isBlank(url)) { - throw new HttpProtocolException("请求url必填"); + if (CharSequenceUtil.isBlank(url)) { + throw new HttpProtocolException(HttpConstants.HTTP_URL_NULL_ERROR); } - Response response = null; try { // 构建请求body HttpUrl httpUrl = buildRequestUrl(url, requestMessage.getQueryParam()); Request.Builder builder = new Request.Builder().post(requestMessage.getRequestBody()).url(httpUrl); + //构建请求头 handleRequestHeader(requestMessage, builder); - response = httpInvoke(null, client.newCall(builder.build()), requestMessage); - ResponseBody responseBody = response.body(); - requestMessage.build(response.code(), responseBody.string(), responseBody.contentType().type(), response.message()); + requestMessage.build(response.code(), getResponseBodyStr(response), getContentType(response), response.message()); } catch (IOException e) { throw new HttpProtocolException(e.getMessage(), e); } finally { - if(response != null) { - response.body().close(); + if (response != null) { + Objects.requireNonNull(response.body()).close(); } } } @@ -125,11 +118,11 @@ public class OkHttpClient extends HttpClient { @Override public void post(HttpClientMessage requestMessage, Consumer handle) { String url = requestMessage.getUrl(); - if(StrUtil.isBlank(url)) { - throw new HttpProtocolException("请求url必填"); + if (CharSequenceUtil.isBlank(url)) { + throw new HttpProtocolException(HttpConstants.HTTP_URL_NULL_ERROR); } - if(null == handle) { + if (null == handle) { throw new HttpProtocolException("未指定异步协议处理器: [ProtocolHandle]"); } @@ -149,7 +142,7 @@ public class OkHttpClient extends HttpClient { } protected Response httpInvoke(Consumer handle, Call call, HttpClientMessage message) throws IOException { - if(handle != null) { + if (handle != null) { call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { @@ -158,29 +151,44 @@ public class OkHttpClient extends HttpClient { @Override public void onResponse(Call call, Response response) throws IOException { - ResponseBody body = response.body(); - handle.accept(message.build(response.code(), body.string(), body.contentType().type(), response.message())); + handle.accept(message.build(response.code(), getResponseBodyStr(response), getContentType(response), response.message())); } }); - return null; } else { return call.execute(); } } + private String getResponseBodyStr(Response response) throws IOException { + String responseBodyStr = null; + if (response != null && response.body() != null) { + responseBodyStr = response.body().string(); + } + return responseBodyStr; + } + + private String getContentType(Response response) { + String contentType = null; + if (response != null && response.body() != null) { + MediaType mediaType = response.body().contentType(); + if (mediaType != null) { + contentType = mediaType.type(); + } + } + return contentType; + } + @Override public void init(Object config) { - client = new okhttp3.OkHttpClient().newBuilder() - .connectTimeout(getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS) - .readTimeout(getConfig().getReaderIdleTime(), TimeUnit.SECONDS).build(); + client = new okhttp3.OkHttpClient().newBuilder().connectTimeout(getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS).readTimeout(getConfig().getReaderIdleTime(), TimeUnit.SECONDS).build(); } @Override public Object close() { client.dispatcher().cancelAll(); ExecutorService executorService = client.dispatcher().executorService(); - if(!executorService.isShutdown()) { + if (!executorService.isShutdown()) { executorService.shutdown(); } diff --git a/iot-core/src/main/java/com/iteaj/iot/ProtocolHandle.java b/iot-core/src/main/java/com/iteaj/iot/ProtocolHandle.java index a72cd053255f0e6b9cd47da01c78a96054df2cd2..c55ca7a717e1fe74904e7b46ac97d536f0881fa5 100644 --- a/iot-core/src/main/java/com/iteaj/iot/ProtocolHandle.java +++ b/iot-core/src/main/java/com/iteaj/iot/ProtocolHandle.java @@ -3,16 +3,10 @@ package com.iteaj.iot; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ClassUtil; import cn.hutool.core.util.ReflectUtil; -import cn.hutool.core.util.TypeUtil; -import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; -import java.lang.reflect.TypeVariable; -import java.util.Arrays; -import java.util.Map; -import java.util.Optional; /** *

协议对应的业务处理器

@@ -20,9 +14,9 @@ import java.util.Optional; * @author iteaj * @since 1.8 */ -public interface ProtocolHandle { +public interface ProtocolHandle< T extends Protocol > { - Method method = ReflectUtil.getMethod(ProtocolHandle.class, "handle", Protocol.class); + Method method = ReflectUtil.getMethod(ProtocolHandle.class , "handle" , Protocol.class); /** * 协议的业务处理 @@ -30,38 +24,38 @@ public interface ProtocolHandle { */ Object handle(T protocol); - default Class protocolClass() { - ParameterizedType parameterizedType = toParameterizedType(getClass(), ProtocolHandle.class); - if(parameterizedType != null) { - return (Class) parameterizedType.getActualTypeArguments()[0]; + default Class < T > protocolClass() { + ParameterizedType parameterizedType = toParameterizedType(getClass() , ProtocolHandle.class); + if (parameterizedType != null) { + return (Class < T >) parameterizedType.getActualTypeArguments()[0]; } - return (Class) ClassUtil.getTypeArgument(getClass()); + return (Class < T >) ClassUtil.getTypeArgument(getClass()); } - static ParameterizedType toParameterizedType(Type type, Class cla) { + static ParameterizedType toParameterizedType(Type type , Class cla) { ParameterizedType result = null; if (type instanceof ParameterizedType) { result = (ParameterizedType) type; } else if (type instanceof Class) { - final Class clazz = (Class) type; + final Class < ? > clazz = (Class < ? >) type; Type genericSuper = clazz.getGenericSuperclass(); if (null == genericSuper || Object.class.equals(genericSuper)) { // 如果类没有父类,而是实现一些定义好的泛型接口,则取接口的Type final Type[] genericInterfaces = clazz.getGenericInterfaces(); if (ArrayUtil.isNotEmpty(genericInterfaces)) { - for (int i = 0; i < genericInterfaces.length; i++) { + for (int i = 0 ; i < genericInterfaces.length ; i++) { genericSuper = genericInterfaces[i]; - if(genericSuper instanceof ParameterizedTypeImpl) { - Class rawType = ((ParameterizedTypeImpl) genericSuper).getRawType(); - if(cla.isAssignableFrom(rawType)) { + if (genericSuper instanceof ParameterizedType) { + Type rawType = ((ParameterizedType) genericSuper).getRawType(); + if (rawType instanceof Class && cla.isAssignableFrom((Class < ? >) rawType)) { break; } } } } } - result = toParameterizedType(genericSuper, cla); + result = toParameterizedType(genericSuper , cla); } return result; } diff --git a/iot-mqtt/src/main/java/com/iteaj/iot/client/mqtt/MqttClient.java b/iot-mqtt/src/main/java/com/iteaj/iot/client/mqtt/MqttClient.java index 32154078309b549764b21b11b76194e613dcf104..72a06180798616f30249aaa562069e3cd2337cfe 100644 --- a/iot-mqtt/src/main/java/com/iteaj/iot/client/mqtt/MqttClient.java +++ b/iot-mqtt/src/main/java/com/iteaj/iot/client/mqtt/MqttClient.java @@ -25,13 +25,13 @@ import static com.iteaj.iot.CoreConst.CLIENT_DECODER_HANDLER; /** * 基于mqtt协议的客户端 + * * @see MqttConnectProperties#getAllIdleTime() 作为keepAlive字段 单位毫秒 * 详情资料{@code https://blog.csdn.net/weixin_40973138/article/details/90036953} * @see MqttEncoder * @see MqttDecoder */ -public class MqttClient extends TcpSocketClient implements - ChannelInboundHandler, ChannelOutboundHandler, MultiStageConnect { +public class MqttClient extends TcpSocketClient implements ChannelInboundHandler, ChannelOutboundHandler, MultiStageConnect { /** * 用来标识mqtt客户端是否已经得到服务端的connAck @@ -66,6 +66,7 @@ public class MqttClient extends TcpSocketClient implements /** * 新增MqttMessage到MqttClientMessage的处理器 包括心跳处理器和解码处理器 + * * @param channel */ @Override @@ -76,6 +77,7 @@ public class MqttClient extends TcpSocketClient implements /** * 连接成功后发起 connect 请求 + * * @param future */ @Override @@ -103,8 +105,7 @@ public class MqttClient extends TcpSocketClient implements , getName(), MqttMessageType.CONNECT, remoteKey(), connectMessage); } } else { - logger.warn("mqtt({}) {} - 远程主机:{} - 状态:失败 - 报文:{}" - , getName(), MqttMessageType.CONNECT, remoteKey(), connectMessage, future1.cause()); + logger.warn("mqtt({}) {} - 远程主机:{} - 状态:失败 - 报文:{}", getName(), MqttMessageType.CONNECT, remoteKey(), connectMessage, future1.cause()); } }); } @@ -242,34 +243,53 @@ public class MqttClient extends TcpSocketClient implements @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if(msg instanceof MqttMessage) { + if (msg instanceof MqttMessage) { MqttMessage message = (MqttMessage) msg; - MqttMessageType type = message.fixedHeader().messageType(); - + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + logger.info("Mqtt Client接收到服务端消息:msg=[{}],mqttfixedHeader=[{}]",message,mqttFixedHeader); + MqttMessageType type = mqttFixedHeader.messageType(); switch (type) { // 连接确认 - case CONNACK: connAck(message, ctx); break; + case CONNACK: + connAck(message, ctx); + break; // 服务端 发布 - case PUBLISH: publish(message, ctx); break; + case PUBLISH: + publish(message, ctx); + break; // 客户端发布确认(QoS1) - case PUBACK: pubAck(message, ctx); break; + case PUBACK: + pubAck(message, ctx); + break; // 客户端发布 第一次确认(QoS2) - case PUBREC: pubRec(message, ctx); break; + case PUBREC: + pubRec(message, ctx); + break; // 服务端发布 发布第二次确认(QoS2) - case PUBREL: pubRel(message, ctx); break; + case PUBREL: + pubRel(message, ctx); + break; // 客户端发布 发布第三次确认(QoS2) - case PUBCOMP: pubComp(message, ctx); break; + case PUBCOMP: + pubComp(message, ctx); + break; // 订阅确认 - case SUBACK: subAck(message, ctx); break; + case SUBACK: + subAck(message, ctx); + break; // 取消订阅确认 - case UNSUBACK: unSubAck((MqttUnsubAckMessage) msg, ctx); break; + case UNSUBACK: + unSubAck((MqttUnsubAckMessage) msg, ctx); + break; // ping 响应 - case PINGRESP: pingResp(message, ctx); break; + case PINGRESP: + pingResp(message, ctx); + break; } } else { ctx.fireChannelRead(msg); @@ -327,13 +347,12 @@ public class MqttClient extends TcpSocketClient implements return this.stageConnect(getConfig().getConnectTimeout()); } - protected void decoderResultCall(MqttMessage message - , Consumer success, Consumer fail) { + protected void decoderResultCall(MqttMessage message, Consumer success, Consumer fail) { DecoderResult result = message.decoderResult(); - if(result.isSuccess()) { + if (result.isSuccess()) { success.accept(result); } else { - if(fail != null) { + if (fail != null) { fail.accept(result); } } @@ -341,6 +360,7 @@ public class MqttClient extends TcpSocketClient implements /** * 服务端的连接确认报文 确认之后我们将发起向服务端订阅的报文 + * * @param message * @param ctx */ @@ -349,11 +369,11 @@ public class MqttClient extends TcpSocketClient implements MqttConnAckMessage msg = (MqttConnAckMessage) message; MqttConnectReturnCode mqttConnectReturnCode = msg.variableHeader().connectReturnCode(); // 连接成功, 发送订阅报文 - if(mqttConnectReturnCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) { + if (mqttConnectReturnCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) { this.setSuccess(); // 标记已经成功 MqttConnectProperties client = (MqttConnectProperties) ctx.channel().attr(CoreConst.CLIENT_KEY).get(); List list = this.getClientComponent().doSubscribe(client);// 交由实现者新增订阅 - if(!CollectionUtil.isEmpty(list)) { + if (!CollectionUtil.isEmpty(list)) { int messageId = messageIdManager.nextId(); MqttMessageBuilders.SubscribeBuilder subscribe = MqttMessageBuilders.subscribe(); subscribe.messageId(messageId); @@ -362,9 +382,8 @@ public class MqttClient extends TcpSocketClient implements this.doWriteAndFlush(MqttMessageType.SUBSCRIBE, ctx.channel(), subscribe.build(), messageId); } - if(logger.isTraceEnabled()) { - logger.warn("mqtt({}) {} - 远程主机:{} - 状态:成功 - 报文:{}" - , getName(), MqttMessageType.CONNACK, remoteKey(), message); + if (logger.isTraceEnabled()) { + logger.warn("mqtt({}) {} - 远程主机:{} - 状态:成功 - 报文:{}", getName(), MqttMessageType.CONNACK, remoteKey(), message); } // 发布客户端上线事件 FrameworkManager.publishEvent(new StatusEvent(this, ClientStatus.online, getClientComponent())); @@ -377,6 +396,7 @@ public class MqttClient extends TcpSocketClient implements /** * 服务端发布报文, 收到服务端发布的报文后, 解析出报文 payload 并组装成MqttClientMessage + * * @param mqttMessage * @param ctx */ @@ -388,9 +408,8 @@ public class MqttClient extends TcpSocketClient implements final int packetId = msg.variableHeader().packetId(); ConnectProperties properties = getConfig(); - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {}(SERVER {}) - PacketId:{} - 主机:{}" - , getName(), MqttMessageType.PUBLISH, fixedHeader.qosLevel(), packetId, properties); + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {}(SERVER {}) - PacketId:{} - 主机:{}", getName(), MqttMessageType.PUBLISH, fixedHeader.qosLevel(), packetId, properties); } try { /** @@ -398,14 +417,12 @@ public class MqttClient extends TcpSocketClient implements * 1. 如果packetId的报文存在说明客户端已经收到了一次, 不能重复接受 * 2. 直接返回确认报文 */ - if(fixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE) { + if (fixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE) { final MessageMapper server = getMessageIdManager().getServer(packetId); - if(server != null) { + if (server != null) { final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(packetId); - MqttMessageType type = fixedHeader.qosLevel() == MqttQoS.AT_LEAST_ONCE - ? MqttMessageType.PUBACK : MqttMessageType.PUBREC; - final MqttFixedHeader ackFixedHeader = new MqttFixedHeader(type - , false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageType type = fixedHeader.qosLevel() == MqttQoS.AT_LEAST_ONCE ? MqttMessageType.PUBACK : MqttMessageType.PUBREC; + final MqttFixedHeader ackFixedHeader = new MqttFixedHeader(type, false, MqttQoS.AT_MOST_ONCE, false, 0); getChannel().writeAndFlush(new MqttMessage(ackFixedHeader, variableHeader)); return; @@ -430,31 +447,27 @@ public class MqttClient extends TcpSocketClient implements getMessageIdManager().addServer(packetId, mapper); // 服务端发布的Qos = 1 客户端返回ACK确认 - if(fixedHeader.qosLevel() == MqttQoS.AT_LEAST_ONCE) { - final MqttFixedHeader ackFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK - , false, MqttQoS.AT_MOST_ONCE, false, 0); + if (fixedHeader.qosLevel() == MqttQoS.AT_LEAST_ONCE) { + final MqttFixedHeader ackFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(packetId); ctx.channel().writeAndFlush(new MqttMessage(ackFixedHeader, variableHeader)).addListener(future -> { // 如果ACK确认报文发送成功则移除对应的报文 - if(future.isSuccess()) { + if (future.isSuccess()) { getMessageIdManager().removeServer(packetId); - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBACK, packetId, remoteKey()); + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBACK, packetId, remoteKey()); } } else { - logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBACK, packetId, remoteKey(), future.cause()); + logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBACK, packetId, remoteKey(), future.cause()); } }); ctx.fireChannelRead(message); // 服务端发布的Qos = 2 客户端返回Rec确认 - } else if(fixedHeader.qosLevel() == MqttQoS.EXACTLY_ONCE) { - final MqttFixedHeader recFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC - , false, MqttQoS.AT_MOST_ONCE, false, 0); + } else if (fixedHeader.qosLevel() == MqttQoS.EXACTLY_ONCE) { + final MqttFixedHeader recFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0); final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(packetId); /** @@ -462,19 +475,17 @@ public class MqttClient extends TcpSocketClient implements * @see #pubRel(MqttMessage, ChannelHandlerContext) */ ctx.channel().writeAndFlush(new MqttMessage(recFixedHeader, variableHeader)).addListener(future -> { - if(future.isSuccess()) { - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBREC, packetId, remoteKey()); + if (future.isSuccess()) { + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREC, packetId, remoteKey()); } } else { - logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBREC, packetId, remoteKey(), future.cause()); + logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREC, packetId, remoteKey(), future.cause()); } }); } } catch (Exception e) { - throw new MqttClientException("没有匹配的构造函数["+getClientComponent().getMessageClass().getSimpleName()+"(MqttMessage)]"); + throw new MqttClientException("没有匹配的构造函数[" + getClientComponent().getMessageClass().getSimpleName() + "(MqttMessage)]"); } }, null); } @@ -489,31 +500,32 @@ public class MqttClient extends TcpSocketClient implements */ protected MqttClientMessage buildPublishMessage(ChannelHandlerContext ctx, MqttPublishMessage message) throws Exception { SocketMessage proxy = getClientComponent().proxy(ctx, message.content()); - if(proxy instanceof MqttClientMessage) { + if (proxy instanceof MqttClientMessage) { return ((MqttClientMessage) proxy).setMqttMessage(message).readBuild(); } else { - throw new MqttClientException("mqtt报文类型必须是["+MqttClientMessage.class.getSimpleName()+"]"); + throw new MqttClientException("mqtt报文类型必须是[" + MqttClientMessage.class.getSimpleName() + "]"); } } /** * 服务端给客户端发送的PUBLISH确认报文(QoS1) - * @see io.netty.handler.codec.mqtt.MqttQoS#AT_LEAST_ONCE + * * @param message * @param ctx + * @see io.netty.handler.codec.mqtt.MqttQoS#AT_LEAST_ONCE */ protected void pubAck(MqttMessage message, ChannelHandlerContext ctx) { this.decoderResultCall(message, result -> { MqttPubAckMessage msg = (MqttPubAckMessage) message; final int messageId = msg.variableHeader().messageId(); final MessageMapper remove = getMessageIdManager().remove(messageId); - if(remove != null) { + if (remove != null) { getClientComponent().getPublishListener().success(this, remove); } }, result -> { MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); final MessageMapper remove = getMessageIdManager().remove(variableHeader.messageId()); - if(remove != null) { + if (remove != null) { getClientComponent().getPublishListener().remove(this, remove); } }); @@ -522,67 +534,62 @@ public class MqttClient extends TcpSocketClient implements /** * 服务端向客户端第一次确认(QoS2) - * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE + * * @param msg * @param ctx + * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE */ protected void pubRec(MqttMessage msg, ChannelHandlerContext ctx) { final MqttMessageIdVariableHeader idVariableHeader = (MqttMessageIdVariableHeader) msg.variableHeader(); final int packetId = idVariableHeader.messageId(); - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBREC, packetId, remoteKey()); + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREC, packetId, remoteKey()); } // 向服务端发送Rel报文, 告诉服务端可以删除对应的MsgId报文 - final MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL - , false, MqttQoS.AT_MOST_ONCE, false, 0); + final MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_MOST_ONCE, false, 0); ctx.channel().writeAndFlush(new MqttMessage(fixedHeader, msg.variableHeader())).addListener(future -> { - if(future.isSuccess()) { - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBREL, packetId, remoteKey()); + if (future.isSuccess()) { + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREL, packetId, remoteKey()); } } else { - logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBREL, packetId, remoteKey(), future.cause()); + logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREL, packetId, remoteKey(), future.cause()); } }); } /** * 服务端向客户端第二次确认(QoS2) 让客户端删除PacketId对应的Msg - * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE + * * @param msg * @param ctx + * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE */ protected void pubRel(MqttMessage msg, ChannelHandlerContext ctx) { final MqttMessageIdVariableHeader idVariableHeader = (MqttMessageIdVariableHeader) msg.variableHeader(); final int packetId = idVariableHeader.messageId(); final MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBREL, packetId, remoteKey()); + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREL, packetId, remoteKey()); } getChannel().writeAndFlush(new MqttMessage(header, idVariableHeader)).addListener(future -> { // 发送PUBCOMP成功, 删除PacketId对应的包 - if(future.isSuccess()) { + if (future.isSuccess()) { final MessageMapper mapper = getMessageIdManager().removeServer(packetId); - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBCOMP, packetId, remoteKey()); + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBCOMP, packetId, remoteKey()); } // 将报文上报给业务层处理器处理 - if(mapper != null) { + if (mapper != null) { ctx.fireChannelRead(mapper.getMessage()); } } else { - logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBCOMP, packetId, remoteKey(), future.cause()); + logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBCOMP, packetId, remoteKey(), future.cause()); } }); } @@ -590,21 +597,21 @@ public class MqttClient extends TcpSocketClient implements /** * 服务端向客户端发布第二次确认(QoS2) * 服务端通知客户端可以删除PacketId对应的报文了 - * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE + * * @param msg * @param ctx + * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE */ protected void pubComp(MqttMessage msg, ChannelHandlerContext ctx) { - final MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader)msg.variableHeader(); + final MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) msg.variableHeader(); final int messageId = variableHeader.messageId(); - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}" - , getName(), MqttMessageType.PUBCOMP, messageId, remoteKey()); + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBCOMP, messageId, remoteKey()); } // 服务端通知客户端可以删除MsgId对应的报文了 final MessageMapper remove = getMessageIdManager().remove(messageId); - if(remove != null) { + if (remove != null) { getClientComponent().getPublishListener().success(this, remove); } } @@ -617,12 +624,13 @@ public class MqttClient extends TcpSocketClient implements * SUBSCRIBE报文拥有固定报头、可变报头、有效载荷。 * 当服务器收到客户端发送的一个SUBSCRIBE报文时,必须向客户端发送一个SUBACK报文响应,同时SUBACK报文必须和等待确认的SUBSCRIBE报文有相同的报文标识符。 * 如果服务器收到一个SUBSCRIBE报文,报文的主题过滤器与一个现存订阅的主题过滤器相同,那么必须使用新的订阅彻底替换现存的订阅。新订阅的主题过滤器和之前订阅的相同,但是它的最大QoS值可以不同。与这个主题过滤器匹配的任何现存的保留消息必须被重发,但是发布流程不能中断。 + * * @param message * @param ctx */ protected void subAck(MqttMessage message, ChannelHandlerContext ctx) { this.decoderResultCall(message, result -> { - if(logger.isTraceEnabled()) { + if (logger.isTraceEnabled()) { MqttSubAckMessage msg = (MqttSubAckMessage) message; int i = msg.variableHeader().messageId(); @@ -630,13 +638,13 @@ public class MqttClient extends TcpSocketClient implements } }, result -> { final MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); - logger.warn("mqtt({}) {}(未完成) - PacketId: {} - 远程主机:{}" - , getName(), MqttMessageType.SUBACK, variableHeader.messageId(), getConfig(), result.cause()); + logger.warn("mqtt({}) {}(未完成) - PacketId: {} - 远程主机:{}", getName(), MqttMessageType.SUBACK, variableHeader.messageId(), getConfig(), result.cause()); }); } /** - * 取消订阅确认报文 + * 取消订阅确认报文 + * * @param message * @param ctx */ @@ -645,50 +653,46 @@ public class MqttClient extends TcpSocketClient implements MqttUnsubAckMessage msg = (MqttUnsubAckMessage) message; final int i = msg.variableHeader().messageId(); - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {}(成功) - PacketId: {} - 远程主机:{} - msg:{}" - , getName(), MqttMessageType.UNSUBACK, i, getConfig(), message); + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {}(成功) - PacketId: {} - 远程主机:{} - msg:{}", getName(), MqttMessageType.UNSUBACK, i, getConfig(), message); } }, result -> { final MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); int messageId = variableHeader.messageId(); - logger.error("mqtt({}) {}(未完成) - PacketId: {} - 远程主机:{} - msg:{}" - , getName(), MqttMessageType.UNSUBACK, messageId, remoteKey(), message, result.cause()); + logger.error("mqtt({}) {}(未完成) - PacketId: {} - 远程主机:{} - msg:{}", getName(), MqttMessageType.UNSUBACK, messageId, remoteKey(), message, result.cause()); }); } /** * 客户端ping响应报文 + * * @param msg * @param ctx */ protected void pingResp(MqttMessage msg, ChannelHandlerContext ctx) { - if(logger.isTraceEnabled()) { + if (logger.isTraceEnabled()) { logger.trace("mqtt({}) {} - 远程主机: {}", getName(), MqttMessageType.PINGRESP, remoteKey()); } } protected void doWriteAndFlush(MqttMessageType messageType, Channel channel, MqttMessage message, Integer messageId) { channel.writeAndFlush(message).addListener(future -> { - if(future.isSuccess()) { + if (future.isSuccess()) { - if(messageId != null) { - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {} - PacketId: {} - 远程主机:{}" - , getName(), messageType, messageId, remoteKey()); + if (messageId != null) { + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {} - PacketId: {} - 远程主机:{}", getName(), messageType, messageId, remoteKey()); } } else { - if(logger.isTraceEnabled()) { - logger.trace("mqtt({}) {} - 远程主机:{}" - , getName(), messageType, remoteKey()); + if (logger.isTraceEnabled()) { + logger.trace("mqtt({}) {} - 远程主机:{}", getName(), messageType, remoteKey()); } } - } else if(future.cause() != null) { - logger.error("mqtt({}) {} - 远程主机:{} - 报文:{}" - , getName(), messageType, remoteKey(), message, future.cause()); + } else if (future.cause() != null) { + logger.error("mqtt({}) {} - 远程主机:{} - 报文:{}", getName(), messageType, remoteKey(), message, future.cause()); } }); } @@ -725,18 +729,17 @@ public class MqttClient extends TcpSocketClient implements @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if(msg instanceof ClientSocketProtocol) { + if (msg instanceof ClientSocketProtocol) { ClientSocketProtocol clientProtocol = (ClientSocketProtocol) msg; ClientMessage clientMessage = clientProtocol.requestMessage(); - if(msg instanceof ProtocolPreservable) { - if(((ProtocolPreservable) msg).isRelation()) { - getClientComponent().protocolFactory().add((String) ((ProtocolPreservable) msg) - .relationKey(), (Protocol) msg, ((ProtocolPreservable) msg).getTimeout()); + if (msg instanceof ProtocolPreservable) { + if (((ProtocolPreservable) msg).isRelation()) { + getClientComponent().protocolFactory().add((String) ((ProtocolPreservable) msg).relationKey(), (Protocol) msg, ((ProtocolPreservable) msg).getTimeout()); } } // 如果是服务端主动的请求, 使用响应报文 - if(msg instanceof ServerInitiativeProtocol) { + if (msg instanceof ServerInitiativeProtocol) { clientMessage = clientProtocol.responseMessage(); } @@ -745,14 +748,12 @@ public class MqttClient extends TcpSocketClient implements * 只适用于发布类型 * @see MqttMessageType#PUBLISH */ - if(clientMessage instanceof MqttClientMessage) { + if (clientMessage instanceof MqttClientMessage) { final MqttClientMessage mqttClientMessage = (MqttClientMessage) clientMessage; MqttPublishMessage mqttMessage = buildPublishMqttMessage(mqttClientMessage); - if(logger.isTraceEnabled()) { + if (logger.isTraceEnabled()) { final int packetId = mqttMessage.variableHeader().packetId(); - logger.trace("mqtt({}) PUBLISH(CLIENT {}) - PacketId:{} - 报文:{}" - , getName(), mqttClientMessage.getQos(), packetId - , debugMessage(clientMessage, mqttMessage)); + logger.trace("mqtt({}) PUBLISH(CLIENT {}) - PacketId:{} - 报文:{}", getName(), mqttClientMessage.getQos(), packetId, debugMessage(clientMessage, mqttMessage)); } ctx.write(mqttMessage, promise).addListener(future -> { @@ -768,7 +769,7 @@ public class MqttClient extends TcpSocketClient implements * @see MqttClientComponent#pubRel(MqttMessage, ChannelHandlerContext, List) * @see MqttClientComponent#pubComp(MqttMessage, ChannelHandlerContext, List) */ - if(header.qosLevel() != MqttQoS.AT_MOST_ONCE) { + if (header.qosLevel() != MqttQoS.AT_MOST_ONCE) { final int packetId = mqttMessage.variableHeader().packetId(); final MessageMapper mapper = new MessageMapper(getConfig(), mqttClientMessage, packetId); getMessageIdManager().add(packetId, mapper); diff --git a/iot-test/pom.xml b/iot-test/pom.xml index e5071342efd8815d6858b72a24e03a0f00a5c067..8fe846e36de511e6dcfbfb29adbcb71d3758dfe6 100644 --- a/iot-test/pom.xml +++ b/iot-test/pom.xml @@ -3,89 +3,104 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - 2.7.2 org.springframework.boot spring-boot-starter-parent + 3.3.3 + 4.0.0 - 3.1.1 com.iteaj iot-test + 3.1.1 iot框架并发测试模块 true + 3.1.1 + 17 + 17 + 17 + UTF-8 + UTF-8 com.iteaj iot-core - ${project.version} + ${iot.version} com.iteaj iot-client - ${project.version} + ${iot.version} com.iteaj iot-server - ${project.version} + ${iot.version} com.iteaj iot-redis - ${project.version} + ${iot.version} com.iteaj iot-mqtt - ${project.version} + ${iot.version} com.iteaj iot-plc - ${project.version} + ${iot.version} com.iteaj iot-modbus - ${project.version} + ${iot.version} com.iteaj iot-serial - ${project.version} + ${iot.version} com.iteaj iot-simulator - ${project.version} + ${iot.version} com.iteaj - ${project.version} + ${iot.version} iot-boot-starter + + + + + + + - 5.7.18 - cn.hutool - hutool-json + com.google.code.gson + gson + 2.11.0 + org.springframework.boot spring-boot-starter @@ -100,6 +115,7 @@ mysql mysql-connector-java + 8.0.33 diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestHandle.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestHandle.java index 3593af517dd71b9bb3cc2fd4042f5a0a254b253d..e6b4e774aff9570b7e8ac19a70ba9b4caec4799e 100644 --- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestHandle.java +++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestHandle.java @@ -1,17 +1,16 @@ package com.iteaj.iot.test.mqtt; -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; import com.iteaj.iot.IotThreadManager; import com.iteaj.iot.client.ClientProtocolHandle; import com.iteaj.iot.client.mqtt.MqttClient; import com.iteaj.iot.client.mqtt.MqttDecoderInterceptor; -import com.iteaj.iot.client.mqtt.impl.*; -import com.iteaj.iot.codec.filter.DecoderInterceptor; +import com.iteaj.iot.client.mqtt.impl.DefaultMqttComponent; +import com.iteaj.iot.client.mqtt.impl.DefaultMqttConnectProperties; +import com.iteaj.iot.client.mqtt.impl.DefaultMqttPublishProtocol; import com.iteaj.iot.test.IotTestHandle; import com.iteaj.iot.test.IotTestProperties; import com.iteaj.iot.test.TestConst; -import io.netty.handler.codec.mqtt.MqttMessage; +import com.iteaj.iot.test.util.GsonUtil; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.netty.handler.timeout.IdleState; @@ -22,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -144,7 +144,7 @@ public class MqttClientTestHandle implements ClientProtocolHandle { byte[] message = protocol.requestMessage().getMessage(); - JSONObject jsonObject = JSONUtil.parseObj(new String(message)); + Map jsonObject = GsonUtil.toMap(new String(message)); if(jsonObject.containsKey("retain")) { logger.info(TestConst.LOGGER_MQTT_PROTOCOL_DESC, defaultMqttComponent.getName() , "WillTopic", protocol.getTopic(), protocol.getEquipCode(), "-", "通过" ); diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestMessage.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestMessage.java index 29925294411ec086acc89bdf7ba53c7e29605a22..92bd3defbdbc4b98ebede8b29b915809e7f748c0 100644 --- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestMessage.java +++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestMessage.java @@ -1,57 +1,58 @@ package com.iteaj.iot.test.mqtt; -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson.JSONObject; import com.iteaj.iot.client.mqtt.message.MqttClientMessage; import com.iteaj.iot.client.mqtt.message.MqttMessageHead; import com.iteaj.iot.test.TestProtocolType; import com.iteaj.iot.test.TestStatus; import com.iteaj.iot.test.TestStatusHeader; +import com.iteaj.iot.test.util.GsonUtil; import com.iteaj.iot.utils.ByteUtil; -import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import java.util.Map; + public class MqttClientTestMessage extends MqttClientMessage { public MqttClientTestMessage(byte[] message) { super(message); } - public MqttClientTestMessage(MqttMessageHead head, String topic) { - super(head, topic); + public MqttClientTestMessage(MqttMessageHead head , String topic) { + super(head , topic); } - public MqttClientTestMessage(MqttMessageHead head, MqttQoS qos, String topic) { - super(head, qos, topic); + public MqttClientTestMessage(MqttMessageHead head , MqttQoS qos , String topic) { + super(head , qos , topic); } - public MqttClientTestMessage(MqttMessageHead head, MessageBody body, String topic) { - super(head, body, topic); + public MqttClientTestMessage(MqttMessageHead head , MessageBody body , String topic) { + super(head , body , topic); } @Override protected MqttMessageHead doBuild(byte[] payload) { - if(this.getTopic().contains("willTopic")) { + if (this.getTopic().contains("willTopic")) { String clientId = this.getTopic().split("/")[1]; - return new MqttMessageHead(clientId, clientId, TestProtocolType.WillTop); - } else if(this.getTopic().endsWith("Once")) { - JSONObject jsonObject = JSONUtil.parseObj(ByteUtil.bytesToString(payload)); - return new MqttMessageHead(jsonObject.getStr("equipCode") - , jsonObject.getStr("messageId"), TestProtocolType.PIReq); - } else if(this.getTopic().equals(MqttClientTestHandle.TOPIC_RESPONSE)) { - JSONObject jsonObject = JSONUtil.parseObj(ByteUtil.bytesToString(payload)); - return new MqttMessageHead(jsonObject.getStr("equipCode") - , jsonObject.getStr("messageId"), TestProtocolType.CIReq); + return new MqttMessageHead(clientId , clientId , TestProtocolType.WillTop); + } else if (this.getTopic().endsWith("Once")) { + Map jsonObject = GsonUtil.toMap(ByteUtil.bytesToString(payload)); + return new MqttMessageHead(jsonObject.get("equipCode") , jsonObject.get("messageId") , TestProtocolType.PIReq); + } else if (this.getTopic().equals(MqttClientTestHandle.TOPIC_RESPONSE)) { + Map jsonObject = GsonUtil.toMap(ByteUtil.bytesToString(payload)); + return new MqttMessageHead(jsonObject.get("equipCode") , jsonObject.get("messageId") , TestProtocolType.CIReq); } else { - JSONObject jsonObject = JSONUtil.parseObj(ByteUtil.bytesToString(payload)); - TestProtocolType type = jsonObject.get("type", TestProtocolType.class); - if(type == TestProtocolType.PIReq) { - return new TestStatusHeader(jsonObject.getStr("equipCode"), - jsonObject.getStr("messageId"), type, jsonObject.get("status", TestStatus.class)); - } else { - return new MqttMessageHead(jsonObject.getStr("equipCode"), jsonObject.getStr("messageId"), type); + Map jsonObject = GsonUtil.toMap(ByteUtil.bytesToString(payload)); + + //TODO 需要各自实现 +// TestProtocolType type = jsonObject.get("type" , TestProtocolType.class); +// if (type == TestProtocolType.PIReq) { +// return new TestStatusHeader(jsonObject.get("equipCode") , jsonObject.get("messageId") , type , jsonObject.get("status" , TestStatus.class)); +// } else { +// return new MqttMessageHead(jsonObject.get("equipCode") , jsonObject.get("messageId") , typetype); + return new MqttMessageHead(jsonObject.get("equipCode") , jsonObject.get("messageId") , null); } + } - } -} + } diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttPublishTestProtocol.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttPublishTestProtocol.java index 3faf10e9cfc5e8e7e04ad045dad0848af6a1456c..119bf58a2a2587b02097505431378e046deb1ea4 100644 --- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttPublishTestProtocol.java +++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttPublishTestProtocol.java @@ -1,11 +1,10 @@ package com.iteaj.iot.test.mqtt; -import cn.hutool.json.JSONUtil; import com.iteaj.iot.client.mqtt.message.MqttMessageHead; import com.iteaj.iot.client.protocol.ClientInitiativeProtocol; import com.iteaj.iot.consts.ExecStatus; -import com.iteaj.iot.message.DefaultMessageHead; import com.iteaj.iot.test.*; +import com.iteaj.iot.test.util.GsonUtil; import io.netty.handler.codec.mqtt.MqttQoS; import java.nio.charset.StandardCharsets; @@ -16,17 +15,17 @@ import java.nio.charset.StandardCharsets; * @author iteaj * @since 1.0 */ -public class MqttPublishTestProtocol extends ClientInitiativeProtocol { +public class MqttPublishTestProtocol extends ClientInitiativeProtocol < MqttClientTestMessage > { private MqttQoS qoS; private String topic; private String deviceSn; public MqttPublishTestProtocol(String topic) { - this(MqttQoS.AT_MOST_ONCE, topic, "MqttTestSn"); + this(MqttQoS.AT_MOST_ONCE , topic , "MqttTestSn"); } - public MqttPublishTestProtocol(MqttQoS qoS, String topic, String deviceSn) { + public MqttPublishTestProtocol(MqttQoS qoS , String topic , String deviceSn) { this.qoS = qoS; this.topic = topic; this.deviceSn = deviceSn; @@ -34,21 +33,19 @@ public class MqttPublishTestProtocol extends ClientInitiativeProtocol jsonObject = GsonUtil.toMap(new String(mqttMessage.getMessage())); if(jsonObject.containsKey("equipCode")) { logger.info(TestConst.LOGGER_MQTT_PROTOCOL_DESC, component.getName() , "subscribe", protocol.getTopic(), protocol.getEquipCode(), "-", "通过" ); - String equipCode = jsonObject.getStr("equipCode"); - String messageId = jsonObject.getStr("messageId"); + String equipCode = jsonObject.get("equipCode"); + String messageId = jsonObject.get("messageId"); MqttMessageHead messageHead = new MqttMessageHead(equipCode, messageId, TestProtocolType.CIReq); - new DefaultMqttPublishProtocol(JSONUtil.toJsonStr(messageHead).getBytes(StandardCharsets.UTF_8) + new DefaultMqttPublishProtocol(GsonUtil.toJsonStr(messageHead).getBytes(StandardCharsets.UTF_8) , MqttClientTestHandle.TOPIC_RESPONSE+"/"+equipCode).request(); } } diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttSubscribeTestProtocol.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttSubscribeTestProtocol.java index 381719daf8f8b551d114d53edb2c6ec4ffe40187..3222a7815ff53ca6d639518645e2de0541db10ac 100644 --- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttSubscribeTestProtocol.java +++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttSubscribeTestProtocol.java @@ -1,6 +1,5 @@ package com.iteaj.iot.test.mqtt; -import cn.hutool.json.JSONUtil; import com.iteaj.iot.Message; import com.iteaj.iot.ProtocolType; import com.iteaj.iot.client.mqtt.MqttClientException; diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttWillTopicTestListener.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttWillTopicTestListener.java index df38f704142b8e59d1d3419e1fed65e51895a608..d98f3f1f5fb5327c097189dbacee6bbabae21fb2 100644 --- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttWillTopicTestListener.java +++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttWillTopicTestListener.java @@ -1,7 +1,5 @@ package com.iteaj.iot.test.mqtt; -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; import com.iteaj.iot.FrameworkManager; import com.iteaj.iot.client.ClientComponent; import com.iteaj.iot.client.mqtt.impl.DefaultMqttConnectProperties; @@ -9,6 +7,7 @@ import com.iteaj.iot.client.mqtt.impl.DefaultMqttMessage; import com.iteaj.iot.client.mqtt.impl.DefaultMqttSubscribeProtocol; import com.iteaj.iot.client.mqtt.impl.MqttSubscribeListener; import com.iteaj.iot.test.TestConst; +import com.iteaj.iot.test.util.GsonUtil; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.slf4j.Logger; @@ -16,6 +15,8 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; +import java.util.Map; + @Component @ConditionalOnExpression("${iot.test.client:false} and ${iot.test.mqtt.start:false}") public class MqttWillTopicTestListener implements MqttSubscribeListener { @@ -31,7 +32,7 @@ public class MqttWillTopicTestListener implements MqttSubscribeListener { public void onSubscribe(DefaultMqttSubscribeProtocol protocol) { byte[] message = protocol.requestMessage().getMessage(); ClientComponent component = FrameworkManager.getClientComponent(DefaultMqttMessage.class); - JSONObject jsonObject = JSONUtil.parseObj(new String(message)); + Map jsonObject = GsonUtil.toMap(new String(message)); if(jsonObject.containsKey("retain")) { logger.info(TestConst.LOGGER_MQTT_PROTOCOL_DESC, component.getName() , "WillTopic", protocol.getTopic(), protocol.getEquipCode(), "-", "通过" ); diff --git a/iot-test/src/main/java/com/iteaj/iot/test/util/GsonUtil.java b/iot-test/src/main/java/com/iteaj/iot/test/util/GsonUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..780bb8145af079095caf3924b6acd7a0b6a79f1a --- /dev/null +++ b/iot-test/src/main/java/com/iteaj/iot/test/util/GsonUtil.java @@ -0,0 +1,36 @@ +package com.iteaj.iot.test.util; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; + +import java.lang.reflect.Type; +import java.util.Map; + +/** + * package-name: com.iteaj.iot.test.util + * project-name: iot + * function : gson封装 + * @author daihui + * @since 2024/11/8 星期五 22:41 + */ +public class GsonUtil { + + private GsonUtil() { + } + + private static final Gson gson = new Gson(); + + public static < T > T fromJSON(String jsonString , Class < T > clazz) { + return gson.fromJson(jsonString , clazz); + } + + public static String toJsonStr(Object object) { + return gson.toJson(object); + } + + public static Map < String, String > toMap(String jsonString) { + Type mapType = new TypeToken < Map < String, String > >() {}.getType(); + return gson.fromJson(jsonString , mapType); + } + +} diff --git a/iot-test/src/main/resources/META-INF/spring.factories b/iot-test/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports similarity index 38% rename from iot-test/src/main/resources/META-INF/spring.factories rename to iot-test/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index de1c7258f7222811683fe56dba14c1f19743ab83..71dd8c6f6c73e2c6fd5f3053535afe7160fa4746 100644 --- a/iot-test/src/main/resources/META-INF/spring.factories +++ b/iot-test/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,6 +1,2 @@ -org.springframework.context.ApplicationListener=\ com.iteaj.iot.test.IotTestStartListener - -# Auto Configure -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.iteaj.iot.test.IotTestAutoConfiguration diff --git a/pom.xml b/pom.xml index 8373d64300eb811c92ffa01e704f80b3cf991f57..e753e038fd525a09e24f1435ef51fdd3a4a1854a 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 2.7.2 + 3.3.3 org.springframework.boot spring-boot-starter-parent @@ -54,14 +54,14 @@ 3.1.1 - 1.8 3.0.3 - 5.7.18 + 5.8.33 1.2.78 - 2.7.2 + 3.3.3 2.9.2 - 1.8 - 1.8 + 17 + 17 + 17 UTF-8 UTF-8