diff --git a/README.md b/README.md
index 28785066cca0ff87bdf2f04fa243889d58dfe1c9..5a792292a370002cfc22c0789505b58285826d12 100644
--- a/README.md
+++ b/README.md
@@ -1,13 +1,16 @@
### 物联网网络中间件
iot-ucy是使用java语言且基于netty, spring boot, redis等开源项目开发来的物联网网络中间件, 支持udp, tcp, 串口(com)通讯(window、linux、mac)等底层协议和http, mqtt, websocket(默认实现和自定义协议头实现), modbus(tcp,rtu),plc,dtu(支持心跳,设备注册功能以及AT协议和自定义协议支持),dtu for modbus tcp,dtu for modbus rtu组件适配 等上层协议. 主打工业物联网底层网络交互、设备管理、数据存储、大数据处理. (其中plc包括西门子S7系列,欧姆龙Fins). 数据存储将使用taos数据库以及redis消息队列
#### 加入社区
-qq群:616124620
+QQ3群 - 272518000
+QQ2群 - 616124620 (已满)
+QQ1群 - 552167793 (已满)
+#### 商务合作(qq号:97235681)
#### 如果您觉得的还可以点个star让更多开发者了解此项目
- [框架使用教程](http://doc.iteaj.com/)
- [演示地址(支持DTU、PLC、MODBUS、串口等调试)](https://iot.iteaj.com/#/login)
- [后端项目仓库(springboot)](https://gitee.com/iteaj/iboot)
- [前端项目仓库(vue3+antdv3+vite2)](https://gitee.com/iteaj/ivzone)
-#### 商务合作(qq号:97235681)
+
#### 已实现功能
1. 西门子和欧姆龙PLC原生协议适配
2. Modbus Tcp协议客户端实现
@@ -21,6 +24,14 @@ qq群:616124620
10. 支持在任意的java环境使用不强制依赖spring框架(V3.0.0+)
11. 支持动态启用和停用组件服务(V3.0.0+)
12. 支持串口+Modbus Rtu(V3.0.0+)
+#### [基于此中间件实现的物联网网关](https://iot.iteaj.com/#/login)([源码](https://gitee.com/iteaj/iboot))
+
+
+
+
+
+
+
#### 接口主要特性
- 支持服务端启动监听多个端口, 统一所有协议可使用的api接口
- 包含一套代理客户端通信协议,支持调用:客户端 -> 服务端 -> 设备 -> 服务端 -> 客户端
diff --git a/iot-boot-starter/src/main/resources/META-INF/spring.factories b/iot-boot-starter/src/main/resources/META-INF/spring.factories
deleted file mode 100644
index 26dd900750e3f355faa9391f6f5969102ea81b09..0000000000000000000000000000000000000000
--- a/iot-boot-starter/src/main/resources/META-INF/spring.factories
+++ /dev/null
@@ -1,13 +0,0 @@
-# application listener
-
-# Auto Configure
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
-com.iteaj.iot.boot.IotApplication, \
-com.iteaj.iot.boot.autoconfigure.IotClientConfiguration, \
-com.iteaj.iot.boot.autoconfigure.IotServerConfiguration, \
-com.iteaj.iot.boot.core.IotCoreConfiguration,\
-com.iteaj.iot.boot.autoconfigure.IotMqttAutoConfiguration,\
-com.iteaj.iot.boot.autoconfigure.IotTaosAutoConfiguration,\
-com.iteaj.iot.boot.autoconfigure.IotRedisAutoConfiguration,\
-com.iteaj.iot.boot.autoconfigure.IotRdbmsAutoConfiguration
-
diff --git a/iot-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/iot-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000000000000000000000000000000000000..25b1e37e00e1e2224972371db06697cd15c15843
--- /dev/null
+++ b/iot-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1,8 @@
+com.iteaj.iot.boot.IotApplication
+com.iteaj.iot.boot.autoconfigure.IotClientConfiguration
+com.iteaj.iot.boot.autoconfigure.IotServerConfiguration
+com.iteaj.iot.boot.core.IotCoreConfiguration
+com.iteaj.iot.boot.autoconfigure.IotMqttAutoConfiguration
+com.iteaj.iot.boot.autoconfigure.IotTaosAutoConfiguration
+com.iteaj.iot.boot.autoconfigure.IotRedisAutoConfiguration
+com.iteaj.iot.boot.autoconfigure.IotRdbmsAutoConfiguration
\ No newline at end of file
diff --git a/iot-client/src/main/java/com/iteaj/iot/client/contants/HttpConstants.java b/iot-client/src/main/java/com/iteaj/iot/client/contants/HttpConstants.java
new file mode 100644
index 0000000000000000000000000000000000000000..d8eeded5d4ac7511b165937b749a9812785880c6
--- /dev/null
+++ b/iot-client/src/main/java/com/iteaj/iot/client/contants/HttpConstants.java
@@ -0,0 +1,16 @@
+package com.iteaj.iot.client.contants;
+
+/**
+ * packageName com.iteaj.iot.client.contants
+ *
+ * @author whcodec
+ * @date 2024/12/7
+ */
+public class HttpConstants {
+
+ private HttpConstants() {
+ }
+
+ public static final String HTTP_URL_NULL_ERROR = "请求url必填";
+
+}
diff --git a/iot-client/src/main/java/com/iteaj/iot/client/http/okhttp/OkHttpClient.java b/iot-client/src/main/java/com/iteaj/iot/client/http/okhttp/OkHttpClient.java
index 53e59511dc75cd5fc8e7462cf34a20de4ba8ca7e..58f8a5edb338ddaf14d82108f1ae58153cd88f62 100644
--- a/iot-client/src/main/java/com/iteaj/iot/client/http/okhttp/OkHttpClient.java
+++ b/iot-client/src/main/java/com/iteaj/iot/client/http/okhttp/OkHttpClient.java
@@ -1,18 +1,21 @@
package com.iteaj.iot.client.http.okhttp;
-import cn.hutool.core.collection.CollectionUtil;
-import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.text.CharSequenceUtil;
+import com.iteaj.iot.client.contants.HttpConstants;
import com.iteaj.iot.client.http.*;
import okhttp3.*;
import java.io.IOException;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* http客户端
+ *
* @see okhttp3.OkHttpClient
*/
public class OkHttpClient extends HttpClient {
@@ -23,51 +26,45 @@ public class OkHttpClient extends HttpClient {
super(properties, component);
}
-
@Override
public void get(HttpClientMessage requestMessage) {
String url = requestMessage.getUrl();
- if(StrUtil.isBlank(url)) {
- throw new HttpProtocolException("请求url必填");
+ if (CharSequenceUtil.isBlank(url)) {
+ throw new HttpProtocolException(HttpConstants.HTTP_URL_NULL_ERROR);
}
Response response = null;
try {
// 构建请求的url
HttpUrl httpUrl = buildRequestUrl(url, requestMessage.getQueryParam());
-
Request.Builder builder = new Request.Builder().get().url(httpUrl);
+ // 构建请求头
handleRequestHeader(requestMessage, builder);
-
response = httpInvoke(null, client.newCall(builder.build()), requestMessage);
- ResponseBody responseBody = response.body();
-
- requestMessage.build(response.code(), responseBody.string(), responseBody.contentType().type(), response.message());
+ requestMessage.build(response.code(), getResponseBodyStr(response), getContentType(response), response.message());
} catch (IOException e) {
throw new HttpProtocolException(e.getMessage(), e);
} finally {
- if(response != null) {
- response.body().close();
+ if (response != null) {
+ Objects.requireNonNull(response.body()).close();
}
}
}
private void handleRequestHeader(HttpClientMessage requestMessage, Request.Builder builder) {
Map heads = requestMessage.getHeads();
- if(CollectionUtil.isNotEmpty(heads)) {
- heads.forEach((key, val) -> {
- builder.addHeader(key, val);
- });
+ if (CollUtil.isNotEmpty(heads)) {
+ heads.forEach(builder::addHeader);
}
}
@Override
public void get(HttpClientMessage requestMessage, Consumer handle) {
String url = requestMessage.getUrl();
- if(StrUtil.isBlank(url)) {
- throw new HttpProtocolException("请求url必填");
+ if (CharSequenceUtil.isBlank(url)) {
+ throw new HttpProtocolException(HttpConstants.HTTP_URL_NULL_ERROR);
}
- if(null == handle) {
+ if (null == handle) {
throw new HttpProtocolException("未指定异步协议处理器: [ProtocolHandle]");
}
@@ -88,10 +85,8 @@ public class OkHttpClient extends HttpClient {
private HttpUrl buildRequestUrl(String url, Map queryParam) {
HttpUrl.Builder builder = HttpUrl.get(url).newBuilder();
- if(queryParam != null) {
- queryParam.forEach((key, value) -> {
- builder.addQueryParameter(key, (String) value);
- });
+ if (queryParam != null) {
+ queryParam.forEach((key, value) -> builder.addQueryParameter(key, (String) value));
}
return builder.build();
}
@@ -99,25 +94,23 @@ public class OkHttpClient extends HttpClient {
@Override
public void post(HttpClientMessage requestMessage) {
String url = requestMessage.getUrl();
- if(StrUtil.isBlank(url)) {
- throw new HttpProtocolException("请求url必填");
+ if (CharSequenceUtil.isBlank(url)) {
+ throw new HttpProtocolException(HttpConstants.HTTP_URL_NULL_ERROR);
}
-
Response response = null;
try {
// 构建请求body
HttpUrl httpUrl = buildRequestUrl(url, requestMessage.getQueryParam());
Request.Builder builder = new Request.Builder().post(requestMessage.getRequestBody()).url(httpUrl);
+ //构建请求头
handleRequestHeader(requestMessage, builder);
-
response = httpInvoke(null, client.newCall(builder.build()), requestMessage);
- ResponseBody responseBody = response.body();
- requestMessage.build(response.code(), responseBody.string(), responseBody.contentType().type(), response.message());
+ requestMessage.build(response.code(), getResponseBodyStr(response), getContentType(response), response.message());
} catch (IOException e) {
throw new HttpProtocolException(e.getMessage(), e);
} finally {
- if(response != null) {
- response.body().close();
+ if (response != null) {
+ Objects.requireNonNull(response.body()).close();
}
}
}
@@ -125,11 +118,11 @@ public class OkHttpClient extends HttpClient {
@Override
public void post(HttpClientMessage requestMessage, Consumer handle) {
String url = requestMessage.getUrl();
- if(StrUtil.isBlank(url)) {
- throw new HttpProtocolException("请求url必填");
+ if (CharSequenceUtil.isBlank(url)) {
+ throw new HttpProtocolException(HttpConstants.HTTP_URL_NULL_ERROR);
}
- if(null == handle) {
+ if (null == handle) {
throw new HttpProtocolException("未指定异步协议处理器: [ProtocolHandle]");
}
@@ -149,7 +142,7 @@ public class OkHttpClient extends HttpClient {
}
protected Response httpInvoke(Consumer handle, Call call, HttpClientMessage message) throws IOException {
- if(handle != null) {
+ if (handle != null) {
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
@@ -158,29 +151,44 @@ public class OkHttpClient extends HttpClient {
@Override
public void onResponse(Call call, Response response) throws IOException {
- ResponseBody body = response.body();
- handle.accept(message.build(response.code(), body.string(), body.contentType().type(), response.message()));
+ handle.accept(message.build(response.code(), getResponseBodyStr(response), getContentType(response), response.message()));
}
});
-
return null;
} else {
return call.execute();
}
}
+ private String getResponseBodyStr(Response response) throws IOException {
+ String responseBodyStr = null;
+ if (response != null && response.body() != null) {
+ responseBodyStr = response.body().string();
+ }
+ return responseBodyStr;
+ }
+
+ private String getContentType(Response response) {
+ String contentType = null;
+ if (response != null && response.body() != null) {
+ MediaType mediaType = response.body().contentType();
+ if (mediaType != null) {
+ contentType = mediaType.type();
+ }
+ }
+ return contentType;
+ }
+
@Override
public void init(Object config) {
- client = new okhttp3.OkHttpClient().newBuilder()
- .connectTimeout(getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS)
- .readTimeout(getConfig().getReaderIdleTime(), TimeUnit.SECONDS).build();
+ client = new okhttp3.OkHttpClient().newBuilder().connectTimeout(getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS).readTimeout(getConfig().getReaderIdleTime(), TimeUnit.SECONDS).build();
}
@Override
public Object close() {
client.dispatcher().cancelAll();
ExecutorService executorService = client.dispatcher().executorService();
- if(!executorService.isShutdown()) {
+ if (!executorService.isShutdown()) {
executorService.shutdown();
}
diff --git a/iot-core/src/main/java/com/iteaj/iot/ProtocolHandle.java b/iot-core/src/main/java/com/iteaj/iot/ProtocolHandle.java
index a72cd053255f0e6b9cd47da01c78a96054df2cd2..c55ca7a717e1fe74904e7b46ac97d536f0881fa5 100644
--- a/iot-core/src/main/java/com/iteaj/iot/ProtocolHandle.java
+++ b/iot-core/src/main/java/com/iteaj/iot/ProtocolHandle.java
@@ -3,16 +3,10 @@ package com.iteaj.iot;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.ReflectUtil;
-import cn.hutool.core.util.TypeUtil;
-import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Optional;
/**
* 协议对应的业务处理器
@@ -20,9 +14,9 @@ import java.util.Optional;
* @author iteaj
* @since 1.8
*/
-public interface ProtocolHandle {
+public interface ProtocolHandle< T extends Protocol > {
- Method method = ReflectUtil.getMethod(ProtocolHandle.class, "handle", Protocol.class);
+ Method method = ReflectUtil.getMethod(ProtocolHandle.class , "handle" , Protocol.class);
/**
* 协议的业务处理
@@ -30,38 +24,38 @@ public interface ProtocolHandle {
*/
Object handle(T protocol);
- default Class protocolClass() {
- ParameterizedType parameterizedType = toParameterizedType(getClass(), ProtocolHandle.class);
- if(parameterizedType != null) {
- return (Class) parameterizedType.getActualTypeArguments()[0];
+ default Class < T > protocolClass() {
+ ParameterizedType parameterizedType = toParameterizedType(getClass() , ProtocolHandle.class);
+ if (parameterizedType != null) {
+ return (Class < T >) parameterizedType.getActualTypeArguments()[0];
}
- return (Class) ClassUtil.getTypeArgument(getClass());
+ return (Class < T >) ClassUtil.getTypeArgument(getClass());
}
- static ParameterizedType toParameterizedType(Type type, Class cla) {
+ static ParameterizedType toParameterizedType(Type type , Class cla) {
ParameterizedType result = null;
if (type instanceof ParameterizedType) {
result = (ParameterizedType) type;
} else if (type instanceof Class) {
- final Class> clazz = (Class>) type;
+ final Class < ? > clazz = (Class < ? >) type;
Type genericSuper = clazz.getGenericSuperclass();
if (null == genericSuper || Object.class.equals(genericSuper)) {
// 如果类没有父类,而是实现一些定义好的泛型接口,则取接口的Type
final Type[] genericInterfaces = clazz.getGenericInterfaces();
if (ArrayUtil.isNotEmpty(genericInterfaces)) {
- for (int i = 0; i < genericInterfaces.length; i++) {
+ for (int i = 0 ; i < genericInterfaces.length ; i++) {
genericSuper = genericInterfaces[i];
- if(genericSuper instanceof ParameterizedTypeImpl) {
- Class> rawType = ((ParameterizedTypeImpl) genericSuper).getRawType();
- if(cla.isAssignableFrom(rawType)) {
+ if (genericSuper instanceof ParameterizedType) {
+ Type rawType = ((ParameterizedType) genericSuper).getRawType();
+ if (rawType instanceof Class && cla.isAssignableFrom((Class < ? >) rawType)) {
break;
}
}
}
}
}
- result = toParameterizedType(genericSuper, cla);
+ result = toParameterizedType(genericSuper , cla);
}
return result;
}
diff --git a/iot-mqtt/src/main/java/com/iteaj/iot/client/mqtt/MqttClient.java b/iot-mqtt/src/main/java/com/iteaj/iot/client/mqtt/MqttClient.java
index 32154078309b549764b21b11b76194e613dcf104..72a06180798616f30249aaa562069e3cd2337cfe 100644
--- a/iot-mqtt/src/main/java/com/iteaj/iot/client/mqtt/MqttClient.java
+++ b/iot-mqtt/src/main/java/com/iteaj/iot/client/mqtt/MqttClient.java
@@ -25,13 +25,13 @@ import static com.iteaj.iot.CoreConst.CLIENT_DECODER_HANDLER;
/**
* 基于mqtt协议的客户端
+ *
* @see MqttConnectProperties#getAllIdleTime() 作为keepAlive字段 单位毫秒
* 详情资料{@code https://blog.csdn.net/weixin_40973138/article/details/90036953}
* @see MqttEncoder
* @see MqttDecoder
*/
-public class MqttClient extends TcpSocketClient implements
- ChannelInboundHandler, ChannelOutboundHandler, MultiStageConnect {
+public class MqttClient extends TcpSocketClient implements ChannelInboundHandler, ChannelOutboundHandler, MultiStageConnect {
/**
* 用来标识mqtt客户端是否已经得到服务端的connAck
@@ -66,6 +66,7 @@ public class MqttClient extends TcpSocketClient implements
/**
* 新增MqttMessage到MqttClientMessage的处理器 包括心跳处理器和解码处理器
+ *
* @param channel
*/
@Override
@@ -76,6 +77,7 @@ public class MqttClient extends TcpSocketClient implements
/**
* 连接成功后发起 connect 请求
+ *
* @param future
*/
@Override
@@ -103,8 +105,7 @@ public class MqttClient extends TcpSocketClient implements
, getName(), MqttMessageType.CONNECT, remoteKey(), connectMessage);
}
} else {
- logger.warn("mqtt({}) {} - 远程主机:{} - 状态:失败 - 报文:{}"
- , getName(), MqttMessageType.CONNECT, remoteKey(), connectMessage, future1.cause());
+ logger.warn("mqtt({}) {} - 远程主机:{} - 状态:失败 - 报文:{}", getName(), MqttMessageType.CONNECT, remoteKey(), connectMessage, future1.cause());
}
});
}
@@ -242,34 +243,53 @@ public class MqttClient extends TcpSocketClient implements
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if(msg instanceof MqttMessage) {
+ if (msg instanceof MqttMessage) {
MqttMessage message = (MqttMessage) msg;
- MqttMessageType type = message.fixedHeader().messageType();
-
+ MqttFixedHeader mqttFixedHeader = message.fixedHeader();
+ logger.info("Mqtt Client接收到服务端消息:msg=[{}],mqttfixedHeader=[{}]",message,mqttFixedHeader);
+ MqttMessageType type = mqttFixedHeader.messageType();
switch (type) {
// 连接确认
- case CONNACK: connAck(message, ctx); break;
+ case CONNACK:
+ connAck(message, ctx);
+ break;
// 服务端 发布
- case PUBLISH: publish(message, ctx); break;
+ case PUBLISH:
+ publish(message, ctx);
+ break;
// 客户端发布确认(QoS1)
- case PUBACK: pubAck(message, ctx); break;
+ case PUBACK:
+ pubAck(message, ctx);
+ break;
// 客户端发布 第一次确认(QoS2)
- case PUBREC: pubRec(message, ctx); break;
+ case PUBREC:
+ pubRec(message, ctx);
+ break;
// 服务端发布 发布第二次确认(QoS2)
- case PUBREL: pubRel(message, ctx); break;
+ case PUBREL:
+ pubRel(message, ctx);
+ break;
// 客户端发布 发布第三次确认(QoS2)
- case PUBCOMP: pubComp(message, ctx); break;
+ case PUBCOMP:
+ pubComp(message, ctx);
+ break;
// 订阅确认
- case SUBACK: subAck(message, ctx); break;
+ case SUBACK:
+ subAck(message, ctx);
+ break;
// 取消订阅确认
- case UNSUBACK: unSubAck((MqttUnsubAckMessage) msg, ctx); break;
+ case UNSUBACK:
+ unSubAck((MqttUnsubAckMessage) msg, ctx);
+ break;
// ping 响应
- case PINGRESP: pingResp(message, ctx); break;
+ case PINGRESP:
+ pingResp(message, ctx);
+ break;
}
} else {
ctx.fireChannelRead(msg);
@@ -327,13 +347,12 @@ public class MqttClient extends TcpSocketClient implements
return this.stageConnect(getConfig().getConnectTimeout());
}
- protected void decoderResultCall(MqttMessage message
- , Consumer success, Consumer fail) {
+ protected void decoderResultCall(MqttMessage message, Consumer success, Consumer fail) {
DecoderResult result = message.decoderResult();
- if(result.isSuccess()) {
+ if (result.isSuccess()) {
success.accept(result);
} else {
- if(fail != null) {
+ if (fail != null) {
fail.accept(result);
}
}
@@ -341,6 +360,7 @@ public class MqttClient extends TcpSocketClient implements
/**
* 服务端的连接确认报文 确认之后我们将发起向服务端订阅的报文
+ *
* @param message
* @param ctx
*/
@@ -349,11 +369,11 @@ public class MqttClient extends TcpSocketClient implements
MqttConnAckMessage msg = (MqttConnAckMessage) message;
MqttConnectReturnCode mqttConnectReturnCode = msg.variableHeader().connectReturnCode();
// 连接成功, 发送订阅报文
- if(mqttConnectReturnCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
+ if (mqttConnectReturnCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
this.setSuccess(); // 标记已经成功
MqttConnectProperties client = (MqttConnectProperties) ctx.channel().attr(CoreConst.CLIENT_KEY).get();
List list = this.getClientComponent().doSubscribe(client);// 交由实现者新增订阅
- if(!CollectionUtil.isEmpty(list)) {
+ if (!CollectionUtil.isEmpty(list)) {
int messageId = messageIdManager.nextId();
MqttMessageBuilders.SubscribeBuilder subscribe = MqttMessageBuilders.subscribe();
subscribe.messageId(messageId);
@@ -362,9 +382,8 @@ public class MqttClient extends TcpSocketClient implements
this.doWriteAndFlush(MqttMessageType.SUBSCRIBE, ctx.channel(), subscribe.build(), messageId);
}
- if(logger.isTraceEnabled()) {
- logger.warn("mqtt({}) {} - 远程主机:{} - 状态:成功 - 报文:{}"
- , getName(), MqttMessageType.CONNACK, remoteKey(), message);
+ if (logger.isTraceEnabled()) {
+ logger.warn("mqtt({}) {} - 远程主机:{} - 状态:成功 - 报文:{}", getName(), MqttMessageType.CONNACK, remoteKey(), message);
}
// 发布客户端上线事件
FrameworkManager.publishEvent(new StatusEvent(this, ClientStatus.online, getClientComponent()));
@@ -377,6 +396,7 @@ public class MqttClient extends TcpSocketClient implements
/**
* 服务端发布报文, 收到服务端发布的报文后, 解析出报文 payload 并组装成MqttClientMessage
+ *
* @param mqttMessage
* @param ctx
*/
@@ -388,9 +408,8 @@ public class MqttClient extends TcpSocketClient implements
final int packetId = msg.variableHeader().packetId();
ConnectProperties properties = getConfig();
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {}(SERVER {}) - PacketId:{} - 主机:{}"
- , getName(), MqttMessageType.PUBLISH, fixedHeader.qosLevel(), packetId, properties);
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {}(SERVER {}) - PacketId:{} - 主机:{}", getName(), MqttMessageType.PUBLISH, fixedHeader.qosLevel(), packetId, properties);
}
try {
/**
@@ -398,14 +417,12 @@ public class MqttClient extends TcpSocketClient implements
* 1. 如果packetId的报文存在说明客户端已经收到了一次, 不能重复接受
* 2. 直接返回确认报文
*/
- if(fixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE) {
+ if (fixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE) {
final MessageMapper server = getMessageIdManager().getServer(packetId);
- if(server != null) {
+ if (server != null) {
final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(packetId);
- MqttMessageType type = fixedHeader.qosLevel() == MqttQoS.AT_LEAST_ONCE
- ? MqttMessageType.PUBACK : MqttMessageType.PUBREC;
- final MqttFixedHeader ackFixedHeader = new MqttFixedHeader(type
- , false, MqttQoS.AT_MOST_ONCE, false, 0);
+ MqttMessageType type = fixedHeader.qosLevel() == MqttQoS.AT_LEAST_ONCE ? MqttMessageType.PUBACK : MqttMessageType.PUBREC;
+ final MqttFixedHeader ackFixedHeader = new MqttFixedHeader(type, false, MqttQoS.AT_MOST_ONCE, false, 0);
getChannel().writeAndFlush(new MqttMessage(ackFixedHeader, variableHeader));
return;
@@ -430,31 +447,27 @@ public class MqttClient extends TcpSocketClient implements
getMessageIdManager().addServer(packetId, mapper);
// 服务端发布的Qos = 1 客户端返回ACK确认
- if(fixedHeader.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
- final MqttFixedHeader ackFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK
- , false, MqttQoS.AT_MOST_ONCE, false, 0);
+ if (fixedHeader.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
+ final MqttFixedHeader ackFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(packetId);
ctx.channel().writeAndFlush(new MqttMessage(ackFixedHeader, variableHeader)).addListener(future -> {
// 如果ACK确认报文发送成功则移除对应的报文
- if(future.isSuccess()) {
+ if (future.isSuccess()) {
getMessageIdManager().removeServer(packetId);
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBACK, packetId, remoteKey());
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBACK, packetId, remoteKey());
}
} else {
- logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBACK, packetId, remoteKey(), future.cause());
+ logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBACK, packetId, remoteKey(), future.cause());
}
});
ctx.fireChannelRead(message);
// 服务端发布的Qos = 2 客户端返回Rec确认
- } else if(fixedHeader.qosLevel() == MqttQoS.EXACTLY_ONCE) {
- final MqttFixedHeader recFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC
- , false, MqttQoS.AT_MOST_ONCE, false, 0);
+ } else if (fixedHeader.qosLevel() == MqttQoS.EXACTLY_ONCE) {
+ final MqttFixedHeader recFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
final MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(packetId);
/**
@@ -462,19 +475,17 @@ public class MqttClient extends TcpSocketClient implements
* @see #pubRel(MqttMessage, ChannelHandlerContext)
*/
ctx.channel().writeAndFlush(new MqttMessage(recFixedHeader, variableHeader)).addListener(future -> {
- if(future.isSuccess()) {
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBREC, packetId, remoteKey());
+ if (future.isSuccess()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREC, packetId, remoteKey());
}
} else {
- logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBREC, packetId, remoteKey(), future.cause());
+ logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREC, packetId, remoteKey(), future.cause());
}
});
}
} catch (Exception e) {
- throw new MqttClientException("没有匹配的构造函数["+getClientComponent().getMessageClass().getSimpleName()+"(MqttMessage)]");
+ throw new MqttClientException("没有匹配的构造函数[" + getClientComponent().getMessageClass().getSimpleName() + "(MqttMessage)]");
}
}, null);
}
@@ -489,31 +500,32 @@ public class MqttClient extends TcpSocketClient implements
*/
protected MqttClientMessage buildPublishMessage(ChannelHandlerContext ctx, MqttPublishMessage message) throws Exception {
SocketMessage proxy = getClientComponent().proxy(ctx, message.content());
- if(proxy instanceof MqttClientMessage) {
+ if (proxy instanceof MqttClientMessage) {
return ((MqttClientMessage) proxy).setMqttMessage(message).readBuild();
} else {
- throw new MqttClientException("mqtt报文类型必须是["+MqttClientMessage.class.getSimpleName()+"]");
+ throw new MqttClientException("mqtt报文类型必须是[" + MqttClientMessage.class.getSimpleName() + "]");
}
}
/**
* 服务端给客户端发送的PUBLISH确认报文(QoS1)
- * @see io.netty.handler.codec.mqtt.MqttQoS#AT_LEAST_ONCE
+ *
* @param message
* @param ctx
+ * @see io.netty.handler.codec.mqtt.MqttQoS#AT_LEAST_ONCE
*/
protected void pubAck(MqttMessage message, ChannelHandlerContext ctx) {
this.decoderResultCall(message, result -> {
MqttPubAckMessage msg = (MqttPubAckMessage) message;
final int messageId = msg.variableHeader().messageId();
final MessageMapper remove = getMessageIdManager().remove(messageId);
- if(remove != null) {
+ if (remove != null) {
getClientComponent().getPublishListener().success(this, remove);
}
}, result -> {
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
final MessageMapper remove = getMessageIdManager().remove(variableHeader.messageId());
- if(remove != null) {
+ if (remove != null) {
getClientComponent().getPublishListener().remove(this, remove);
}
});
@@ -522,67 +534,62 @@ public class MqttClient extends TcpSocketClient implements
/**
* 服务端向客户端第一次确认(QoS2)
- * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE
+ *
* @param msg
* @param ctx
+ * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE
*/
protected void pubRec(MqttMessage msg, ChannelHandlerContext ctx) {
final MqttMessageIdVariableHeader idVariableHeader = (MqttMessageIdVariableHeader) msg.variableHeader();
final int packetId = idVariableHeader.messageId();
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBREC, packetId, remoteKey());
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREC, packetId, remoteKey());
}
// 向服务端发送Rel报文, 告诉服务端可以删除对应的MsgId报文
- final MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL
- , false, MqttQoS.AT_MOST_ONCE, false, 0);
+ final MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_MOST_ONCE, false, 0);
ctx.channel().writeAndFlush(new MqttMessage(fixedHeader, msg.variableHeader())).addListener(future -> {
- if(future.isSuccess()) {
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBREL, packetId, remoteKey());
+ if (future.isSuccess()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREL, packetId, remoteKey());
}
} else {
- logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBREL, packetId, remoteKey(), future.cause());
+ logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREL, packetId, remoteKey(), future.cause());
}
});
}
/**
* 服务端向客户端第二次确认(QoS2) 让客户端删除PacketId对应的Msg
- * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE
+ *
* @param msg
* @param ctx
+ * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE
*/
protected void pubRel(MqttMessage msg, ChannelHandlerContext ctx) {
final MqttMessageIdVariableHeader idVariableHeader = (MqttMessageIdVariableHeader) msg.variableHeader();
final int packetId = idVariableHeader.messageId();
final MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBREL, packetId, remoteKey());
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBREL, packetId, remoteKey());
}
getChannel().writeAndFlush(new MqttMessage(header, idVariableHeader)).addListener(future -> {
// 发送PUBCOMP成功, 删除PacketId对应的包
- if(future.isSuccess()) {
+ if (future.isSuccess()) {
final MessageMapper mapper = getMessageIdManager().removeServer(packetId);
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBCOMP, packetId, remoteKey());
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBCOMP, packetId, remoteKey());
}
// 将报文上报给业务层处理器处理
- if(mapper != null) {
+ if (mapper != null) {
ctx.fireChannelRead(mapper.getMessage());
}
} else {
- logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBCOMP, packetId, remoteKey(), future.cause());
+ logger.warn("mqtt({}) {}(CLIENT) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBCOMP, packetId, remoteKey(), future.cause());
}
});
}
@@ -590,21 +597,21 @@ public class MqttClient extends TcpSocketClient implements
/**
* 服务端向客户端发布第二次确认(QoS2)
* 服务端通知客户端可以删除PacketId对应的报文了
- * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE
+ *
* @param msg
* @param ctx
+ * @see io.netty.handler.codec.mqtt.MqttQoS#EXACTLY_ONCE
*/
protected void pubComp(MqttMessage msg, ChannelHandlerContext ctx) {
- final MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader)msg.variableHeader();
+ final MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) msg.variableHeader();
final int messageId = variableHeader.messageId();
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}"
- , getName(), MqttMessageType.PUBCOMP, messageId, remoteKey());
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {}(SERVER) - PacketId:{} - 远程主机:{}", getName(), MqttMessageType.PUBCOMP, messageId, remoteKey());
}
// 服务端通知客户端可以删除MsgId对应的报文了
final MessageMapper remove = getMessageIdManager().remove(messageId);
- if(remove != null) {
+ if (remove != null) {
getClientComponent().getPublishListener().success(this, remove);
}
}
@@ -617,12 +624,13 @@ public class MqttClient extends TcpSocketClient implements
* SUBSCRIBE报文拥有固定报头、可变报头、有效载荷。
* 当服务器收到客户端发送的一个SUBSCRIBE报文时,必须向客户端发送一个SUBACK报文响应,同时SUBACK报文必须和等待确认的SUBSCRIBE报文有相同的报文标识符。
* 如果服务器收到一个SUBSCRIBE报文,报文的主题过滤器与一个现存订阅的主题过滤器相同,那么必须使用新的订阅彻底替换现存的订阅。新订阅的主题过滤器和之前订阅的相同,但是它的最大QoS值可以不同。与这个主题过滤器匹配的任何现存的保留消息必须被重发,但是发布流程不能中断。
+ *
* @param message
* @param ctx
*/
protected void subAck(MqttMessage message, ChannelHandlerContext ctx) {
this.decoderResultCall(message, result -> {
- if(logger.isTraceEnabled()) {
+ if (logger.isTraceEnabled()) {
MqttSubAckMessage msg = (MqttSubAckMessage) message;
int i = msg.variableHeader().messageId();
@@ -630,13 +638,13 @@ public class MqttClient extends TcpSocketClient implements
}
}, result -> {
final MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
- logger.warn("mqtt({}) {}(未完成) - PacketId: {} - 远程主机:{}"
- , getName(), MqttMessageType.SUBACK, variableHeader.messageId(), getConfig(), result.cause());
+ logger.warn("mqtt({}) {}(未完成) - PacketId: {} - 远程主机:{}", getName(), MqttMessageType.SUBACK, variableHeader.messageId(), getConfig(), result.cause());
});
}
/**
- * 取消订阅确认报文
+ * 取消订阅确认报文
+ *
* @param message
* @param ctx
*/
@@ -645,50 +653,46 @@ public class MqttClient extends TcpSocketClient implements
MqttUnsubAckMessage msg = (MqttUnsubAckMessage) message;
final int i = msg.variableHeader().messageId();
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {}(成功) - PacketId: {} - 远程主机:{} - msg:{}"
- , getName(), MqttMessageType.UNSUBACK, i, getConfig(), message);
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {}(成功) - PacketId: {} - 远程主机:{} - msg:{}", getName(), MqttMessageType.UNSUBACK, i, getConfig(), message);
}
}, result -> {
final MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
int messageId = variableHeader.messageId();
- logger.error("mqtt({}) {}(未完成) - PacketId: {} - 远程主机:{} - msg:{}"
- , getName(), MqttMessageType.UNSUBACK, messageId, remoteKey(), message, result.cause());
+ logger.error("mqtt({}) {}(未完成) - PacketId: {} - 远程主机:{} - msg:{}", getName(), MqttMessageType.UNSUBACK, messageId, remoteKey(), message, result.cause());
});
}
/**
* 客户端ping响应报文
+ *
* @param msg
* @param ctx
*/
protected void pingResp(MqttMessage msg, ChannelHandlerContext ctx) {
- if(logger.isTraceEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace("mqtt({}) {} - 远程主机: {}", getName(), MqttMessageType.PINGRESP, remoteKey());
}
}
protected void doWriteAndFlush(MqttMessageType messageType, Channel channel, MqttMessage message, Integer messageId) {
channel.writeAndFlush(message).addListener(future -> {
- if(future.isSuccess()) {
+ if (future.isSuccess()) {
- if(messageId != null) {
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {} - PacketId: {} - 远程主机:{}"
- , getName(), messageType, messageId, remoteKey());
+ if (messageId != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {} - PacketId: {} - 远程主机:{}", getName(), messageType, messageId, remoteKey());
}
} else {
- if(logger.isTraceEnabled()) {
- logger.trace("mqtt({}) {} - 远程主机:{}"
- , getName(), messageType, remoteKey());
+ if (logger.isTraceEnabled()) {
+ logger.trace("mqtt({}) {} - 远程主机:{}", getName(), messageType, remoteKey());
}
}
- } else if(future.cause() != null) {
- logger.error("mqtt({}) {} - 远程主机:{} - 报文:{}"
- , getName(), messageType, remoteKey(), message, future.cause());
+ } else if (future.cause() != null) {
+ logger.error("mqtt({}) {} - 远程主机:{} - 报文:{}", getName(), messageType, remoteKey(), message, future.cause());
}
});
}
@@ -725,18 +729,17 @@ public class MqttClient extends TcpSocketClient implements
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- if(msg instanceof ClientSocketProtocol) {
+ if (msg instanceof ClientSocketProtocol) {
ClientSocketProtocol clientProtocol = (ClientSocketProtocol) msg;
ClientMessage clientMessage = clientProtocol.requestMessage();
- if(msg instanceof ProtocolPreservable) {
- if(((ProtocolPreservable) msg).isRelation()) {
- getClientComponent().protocolFactory().add((String) ((ProtocolPreservable) msg)
- .relationKey(), (Protocol) msg, ((ProtocolPreservable) msg).getTimeout());
+ if (msg instanceof ProtocolPreservable) {
+ if (((ProtocolPreservable) msg).isRelation()) {
+ getClientComponent().protocolFactory().add((String) ((ProtocolPreservable) msg).relationKey(), (Protocol) msg, ((ProtocolPreservable) msg).getTimeout());
}
}
// 如果是服务端主动的请求, 使用响应报文
- if(msg instanceof ServerInitiativeProtocol) {
+ if (msg instanceof ServerInitiativeProtocol) {
clientMessage = clientProtocol.responseMessage();
}
@@ -745,14 +748,12 @@ public class MqttClient extends TcpSocketClient implements
* 只适用于发布类型
* @see MqttMessageType#PUBLISH
*/
- if(clientMessage instanceof MqttClientMessage) {
+ if (clientMessage instanceof MqttClientMessage) {
final MqttClientMessage mqttClientMessage = (MqttClientMessage) clientMessage;
MqttPublishMessage mqttMessage = buildPublishMqttMessage(mqttClientMessage);
- if(logger.isTraceEnabled()) {
+ if (logger.isTraceEnabled()) {
final int packetId = mqttMessage.variableHeader().packetId();
- logger.trace("mqtt({}) PUBLISH(CLIENT {}) - PacketId:{} - 报文:{}"
- , getName(), mqttClientMessage.getQos(), packetId
- , debugMessage(clientMessage, mqttMessage));
+ logger.trace("mqtt({}) PUBLISH(CLIENT {}) - PacketId:{} - 报文:{}", getName(), mqttClientMessage.getQos(), packetId, debugMessage(clientMessage, mqttMessage));
}
ctx.write(mqttMessage, promise).addListener(future -> {
@@ -768,7 +769,7 @@ public class MqttClient extends TcpSocketClient implements
* @see MqttClientComponent#pubRel(MqttMessage, ChannelHandlerContext, List)
* @see MqttClientComponent#pubComp(MqttMessage, ChannelHandlerContext, List)
*/
- if(header.qosLevel() != MqttQoS.AT_MOST_ONCE) {
+ if (header.qosLevel() != MqttQoS.AT_MOST_ONCE) {
final int packetId = mqttMessage.variableHeader().packetId();
final MessageMapper mapper = new MessageMapper(getConfig(), mqttClientMessage, packetId);
getMessageIdManager().add(packetId, mapper);
diff --git a/iot-test/pom.xml b/iot-test/pom.xml
index e5071342efd8815d6858b72a24e03a0f00a5c067..8fe846e36de511e6dcfbfb29adbcb71d3758dfe6 100644
--- a/iot-test/pom.xml
+++ b/iot-test/pom.xml
@@ -3,89 +3,104 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- 2.7.2
org.springframework.boot
spring-boot-starter-parent
+ 3.3.3
+
4.0.0
- 3.1.1
com.iteaj
iot-test
+ 3.1.1
iot框架并发测试模块
true
+ 3.1.1
+ 17
+ 17
+ 17
+ UTF-8
+ UTF-8
com.iteaj
iot-core
- ${project.version}
+ ${iot.version}
com.iteaj
iot-client
- ${project.version}
+ ${iot.version}
com.iteaj
iot-server
- ${project.version}
+ ${iot.version}
com.iteaj
iot-redis
- ${project.version}
+ ${iot.version}
com.iteaj
iot-mqtt
- ${project.version}
+ ${iot.version}
com.iteaj
iot-plc
- ${project.version}
+ ${iot.version}
com.iteaj
iot-modbus
- ${project.version}
+ ${iot.version}
com.iteaj
iot-serial
- ${project.version}
+ ${iot.version}
com.iteaj
iot-simulator
- ${project.version}
+ ${iot.version}
com.iteaj
- ${project.version}
+ ${iot.version}
iot-boot-starter
+
+
+
+
+
+
+
- 5.7.18
- cn.hutool
- hutool-json
+ com.google.code.gson
+ gson
+ 2.11.0
+
org.springframework.boot
spring-boot-starter
@@ -100,6 +115,7 @@
mysql
mysql-connector-java
+ 8.0.33
diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestHandle.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestHandle.java
index 3593af517dd71b9bb3cc2fd4042f5a0a254b253d..e6b4e774aff9570b7e8ac19a70ba9b4caec4799e 100644
--- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestHandle.java
+++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestHandle.java
@@ -1,17 +1,16 @@
package com.iteaj.iot.test.mqtt;
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
import com.iteaj.iot.IotThreadManager;
import com.iteaj.iot.client.ClientProtocolHandle;
import com.iteaj.iot.client.mqtt.MqttClient;
import com.iteaj.iot.client.mqtt.MqttDecoderInterceptor;
-import com.iteaj.iot.client.mqtt.impl.*;
-import com.iteaj.iot.codec.filter.DecoderInterceptor;
+import com.iteaj.iot.client.mqtt.impl.DefaultMqttComponent;
+import com.iteaj.iot.client.mqtt.impl.DefaultMqttConnectProperties;
+import com.iteaj.iot.client.mqtt.impl.DefaultMqttPublishProtocol;
import com.iteaj.iot.test.IotTestHandle;
import com.iteaj.iot.test.IotTestProperties;
import com.iteaj.iot.test.TestConst;
-import io.netty.handler.codec.mqtt.MqttMessage;
+import com.iteaj.iot.test.util.GsonUtil;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.timeout.IdleState;
@@ -22,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -144,7 +144,7 @@ public class MqttClientTestHandle implements ClientProtocolHandle {
byte[] message = protocol.requestMessage().getMessage();
- JSONObject jsonObject = JSONUtil.parseObj(new String(message));
+ Map jsonObject = GsonUtil.toMap(new String(message));
if(jsonObject.containsKey("retain")) {
logger.info(TestConst.LOGGER_MQTT_PROTOCOL_DESC, defaultMqttComponent.getName()
, "WillTopic", protocol.getTopic(), protocol.getEquipCode(), "-", "通过" );
diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestMessage.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestMessage.java
index 29925294411ec086acc89bdf7ba53c7e29605a22..92bd3defbdbc4b98ebede8b29b915809e7f748c0 100644
--- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestMessage.java
+++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttClientTestMessage.java
@@ -1,57 +1,58 @@
package com.iteaj.iot.test.mqtt;
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSONObject;
import com.iteaj.iot.client.mqtt.message.MqttClientMessage;
import com.iteaj.iot.client.mqtt.message.MqttMessageHead;
import com.iteaj.iot.test.TestProtocolType;
import com.iteaj.iot.test.TestStatus;
import com.iteaj.iot.test.TestStatusHeader;
+import com.iteaj.iot.test.util.GsonUtil;
import com.iteaj.iot.utils.ByteUtil;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
+import java.util.Map;
+
public class MqttClientTestMessage extends MqttClientMessage {
public MqttClientTestMessage(byte[] message) {
super(message);
}
- public MqttClientTestMessage(MqttMessageHead head, String topic) {
- super(head, topic);
+ public MqttClientTestMessage(MqttMessageHead head , String topic) {
+ super(head , topic);
}
- public MqttClientTestMessage(MqttMessageHead head, MqttQoS qos, String topic) {
- super(head, qos, topic);
+ public MqttClientTestMessage(MqttMessageHead head , MqttQoS qos , String topic) {
+ super(head , qos , topic);
}
- public MqttClientTestMessage(MqttMessageHead head, MessageBody body, String topic) {
- super(head, body, topic);
+ public MqttClientTestMessage(MqttMessageHead head , MessageBody body , String topic) {
+ super(head , body , topic);
}
@Override
protected MqttMessageHead doBuild(byte[] payload) {
- if(this.getTopic().contains("willTopic")) {
+ if (this.getTopic().contains("willTopic")) {
String clientId = this.getTopic().split("/")[1];
- return new MqttMessageHead(clientId, clientId, TestProtocolType.WillTop);
- } else if(this.getTopic().endsWith("Once")) {
- JSONObject jsonObject = JSONUtil.parseObj(ByteUtil.bytesToString(payload));
- return new MqttMessageHead(jsonObject.getStr("equipCode")
- , jsonObject.getStr("messageId"), TestProtocolType.PIReq);
- } else if(this.getTopic().equals(MqttClientTestHandle.TOPIC_RESPONSE)) {
- JSONObject jsonObject = JSONUtil.parseObj(ByteUtil.bytesToString(payload));
- return new MqttMessageHead(jsonObject.getStr("equipCode")
- , jsonObject.getStr("messageId"), TestProtocolType.CIReq);
+ return new MqttMessageHead(clientId , clientId , TestProtocolType.WillTop);
+ } else if (this.getTopic().endsWith("Once")) {
+ Map jsonObject = GsonUtil.toMap(ByteUtil.bytesToString(payload));
+ return new MqttMessageHead(jsonObject.get("equipCode") , jsonObject.get("messageId") , TestProtocolType.PIReq);
+ } else if (this.getTopic().equals(MqttClientTestHandle.TOPIC_RESPONSE)) {
+ Map jsonObject = GsonUtil.toMap(ByteUtil.bytesToString(payload));
+ return new MqttMessageHead(jsonObject.get("equipCode") , jsonObject.get("messageId") , TestProtocolType.CIReq);
} else {
- JSONObject jsonObject = JSONUtil.parseObj(ByteUtil.bytesToString(payload));
- TestProtocolType type = jsonObject.get("type", TestProtocolType.class);
- if(type == TestProtocolType.PIReq) {
- return new TestStatusHeader(jsonObject.getStr("equipCode"),
- jsonObject.getStr("messageId"), type, jsonObject.get("status", TestStatus.class));
- } else {
- return new MqttMessageHead(jsonObject.getStr("equipCode"), jsonObject.getStr("messageId"), type);
+ Map jsonObject = GsonUtil.toMap(ByteUtil.bytesToString(payload));
+
+ //TODO 需要各自实现
+// TestProtocolType type = jsonObject.get("type" , TestProtocolType.class);
+// if (type == TestProtocolType.PIReq) {
+// return new TestStatusHeader(jsonObject.get("equipCode") , jsonObject.get("messageId") , type , jsonObject.get("status" , TestStatus.class));
+// } else {
+// return new MqttMessageHead(jsonObject.get("equipCode") , jsonObject.get("messageId") , typetype);
+ return new MqttMessageHead(jsonObject.get("equipCode") , jsonObject.get("messageId") , null);
}
+
}
- }
-}
+ }
diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttPublishTestProtocol.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttPublishTestProtocol.java
index 3faf10e9cfc5e8e7e04ad045dad0848af6a1456c..119bf58a2a2587b02097505431378e046deb1ea4 100644
--- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttPublishTestProtocol.java
+++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttPublishTestProtocol.java
@@ -1,11 +1,10 @@
package com.iteaj.iot.test.mqtt;
-import cn.hutool.json.JSONUtil;
import com.iteaj.iot.client.mqtt.message.MqttMessageHead;
import com.iteaj.iot.client.protocol.ClientInitiativeProtocol;
import com.iteaj.iot.consts.ExecStatus;
-import com.iteaj.iot.message.DefaultMessageHead;
import com.iteaj.iot.test.*;
+import com.iteaj.iot.test.util.GsonUtil;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.charset.StandardCharsets;
@@ -16,17 +15,17 @@ import java.nio.charset.StandardCharsets;
* @author iteaj
* @since 1.0
*/
-public class MqttPublishTestProtocol extends ClientInitiativeProtocol {
+public class MqttPublishTestProtocol extends ClientInitiativeProtocol < MqttClientTestMessage > {
private MqttQoS qoS;
private String topic;
private String deviceSn;
public MqttPublishTestProtocol(String topic) {
- this(MqttQoS.AT_MOST_ONCE, topic, "MqttTestSn");
+ this(MqttQoS.AT_MOST_ONCE , topic , "MqttTestSn");
}
- public MqttPublishTestProtocol(MqttQoS qoS, String topic, String deviceSn) {
+ public MqttPublishTestProtocol(MqttQoS qoS , String topic , String deviceSn) {
this.qoS = qoS;
this.topic = topic;
this.deviceSn = deviceSn;
@@ -34,21 +33,19 @@ public class MqttPublishTestProtocol extends ClientInitiativeProtocol jsonObject = GsonUtil.toMap(new String(mqttMessage.getMessage()));
if(jsonObject.containsKey("equipCode")) {
logger.info(TestConst.LOGGER_MQTT_PROTOCOL_DESC, component.getName()
, "subscribe", protocol.getTopic(), protocol.getEquipCode(), "-", "通过" );
- String equipCode = jsonObject.getStr("equipCode");
- String messageId = jsonObject.getStr("messageId");
+ String equipCode = jsonObject.get("equipCode");
+ String messageId = jsonObject.get("messageId");
MqttMessageHead messageHead = new MqttMessageHead(equipCode, messageId, TestProtocolType.CIReq);
- new DefaultMqttPublishProtocol(JSONUtil.toJsonStr(messageHead).getBytes(StandardCharsets.UTF_8)
+ new DefaultMqttPublishProtocol(GsonUtil.toJsonStr(messageHead).getBytes(StandardCharsets.UTF_8)
, MqttClientTestHandle.TOPIC_RESPONSE+"/"+equipCode).request();
}
}
diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttSubscribeTestProtocol.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttSubscribeTestProtocol.java
index 381719daf8f8b551d114d53edb2c6ec4ffe40187..3222a7815ff53ca6d639518645e2de0541db10ac 100644
--- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttSubscribeTestProtocol.java
+++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttSubscribeTestProtocol.java
@@ -1,6 +1,5 @@
package com.iteaj.iot.test.mqtt;
-import cn.hutool.json.JSONUtil;
import com.iteaj.iot.Message;
import com.iteaj.iot.ProtocolType;
import com.iteaj.iot.client.mqtt.MqttClientException;
diff --git a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttWillTopicTestListener.java b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttWillTopicTestListener.java
index df38f704142b8e59d1d3419e1fed65e51895a608..d98f3f1f5fb5327c097189dbacee6bbabae21fb2 100644
--- a/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttWillTopicTestListener.java
+++ b/iot-test/src/main/java/com/iteaj/iot/test/mqtt/MqttWillTopicTestListener.java
@@ -1,7 +1,5 @@
package com.iteaj.iot.test.mqtt;
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
import com.iteaj.iot.FrameworkManager;
import com.iteaj.iot.client.ClientComponent;
import com.iteaj.iot.client.mqtt.impl.DefaultMqttConnectProperties;
@@ -9,6 +7,7 @@ import com.iteaj.iot.client.mqtt.impl.DefaultMqttMessage;
import com.iteaj.iot.client.mqtt.impl.DefaultMqttSubscribeProtocol;
import com.iteaj.iot.client.mqtt.impl.MqttSubscribeListener;
import com.iteaj.iot.test.TestConst;
+import com.iteaj.iot.test.util.GsonUtil;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.slf4j.Logger;
@@ -16,6 +15,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
+import java.util.Map;
+
@Component
@ConditionalOnExpression("${iot.test.client:false} and ${iot.test.mqtt.start:false}")
public class MqttWillTopicTestListener implements MqttSubscribeListener {
@@ -31,7 +32,7 @@ public class MqttWillTopicTestListener implements MqttSubscribeListener {
public void onSubscribe(DefaultMqttSubscribeProtocol protocol) {
byte[] message = protocol.requestMessage().getMessage();
ClientComponent component = FrameworkManager.getClientComponent(DefaultMqttMessage.class);
- JSONObject jsonObject = JSONUtil.parseObj(new String(message));
+ Map jsonObject = GsonUtil.toMap(new String(message));
if(jsonObject.containsKey("retain")) {
logger.info(TestConst.LOGGER_MQTT_PROTOCOL_DESC, component.getName()
, "WillTopic", protocol.getTopic(), protocol.getEquipCode(), "-", "通过" );
diff --git a/iot-test/src/main/java/com/iteaj/iot/test/util/GsonUtil.java b/iot-test/src/main/java/com/iteaj/iot/test/util/GsonUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..780bb8145af079095caf3924b6acd7a0b6a79f1a
--- /dev/null
+++ b/iot-test/src/main/java/com/iteaj/iot/test/util/GsonUtil.java
@@ -0,0 +1,36 @@
+package com.iteaj.iot.test.util;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ * package-name: com.iteaj.iot.test.util
+ * project-name: iot
+ * function : gson封装
+ * @author daihui
+ * @since 2024/11/8 星期五 22:41
+ */
+public class GsonUtil {
+
+ private GsonUtil() {
+ }
+
+ private static final Gson gson = new Gson();
+
+ public static < T > T fromJSON(String jsonString , Class < T > clazz) {
+ return gson.fromJson(jsonString , clazz);
+ }
+
+ public static String toJsonStr(Object object) {
+ return gson.toJson(object);
+ }
+
+ public static Map < String, String > toMap(String jsonString) {
+ Type mapType = new TypeToken < Map < String, String > >() {}.getType();
+ return gson.fromJson(jsonString , mapType);
+ }
+
+}
diff --git a/iot-test/src/main/resources/META-INF/spring.factories b/iot-test/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
similarity index 38%
rename from iot-test/src/main/resources/META-INF/spring.factories
rename to iot-test/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index de1c7258f7222811683fe56dba14c1f19743ab83..71dd8c6f6c73e2c6fd5f3053535afe7160fa4746 100644
--- a/iot-test/src/main/resources/META-INF/spring.factories
+++ b/iot-test/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -1,6 +1,2 @@
-org.springframework.context.ApplicationListener=\
com.iteaj.iot.test.IotTestStartListener
-
-# Auto Configure
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.iteaj.iot.test.IotTestAutoConfiguration
diff --git a/pom.xml b/pom.xml
index 8373d64300eb811c92ffa01e704f80b3cf991f57..e753e038fd525a09e24f1435ef51fdd3a4a1854a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
- 2.7.2
+ 3.3.3
org.springframework.boot
spring-boot-starter-parent
@@ -54,14 +54,14 @@
3.1.1
- 1.8
3.0.3
- 5.7.18
+ 5.8.33
1.2.78
- 2.7.2
+ 3.3.3
2.9.2
- 1.8
- 1.8
+ 17
+ 17
+ 17
UTF-8
UTF-8