# iot **Repository Path**: dingrui11/iot ## Basic Information - **Project Name**: iot - **Description**: iot是基于netty, spring boot, redis等项目实现的物联网中间件, 已支持tcp、udp、mqtt、modbus等常用物联网协议,并且支持快速接入redis、kafka等消息队列[群:552167793] - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: main - **Homepage**: http://www.iteaj.com - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 794 - **Created**: 2021-12-10 - **Last Updated**: 2021-12-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### iot网络中间件(2.0) 使用java语言且基于netty, spring boot, redis等开源项目开发来的物联网网络中间件, 支持udp, tcp底层协议和http, mqtt, modbus等上层协议. 主打工业物联网底层网络交互、设备管理、数据存储、大数据处理 #### 主要特性 - 支持服务端启动监听多个端口, 统一所有协议可使用的api接口 - 包含一套代理客户端通信协议,支持调用:客户端 -> 服务端 -> 设备 -> 服务端 -> 客户端 - 支持设备协议对象和其业务对象进行分离(支持默认业务处理器【spring单例注入】和自定义业务处理器) - 支持redis, 可以方便的将设备上报的数据进行缓存到redis和从redis获取数据 - 支持对数据的生产采集和数据的消费进行分离 - 支持同步和异步调用设备, 支持应用程序代理客户端和设备服务端和设备三端之间的同步和异步调用 - 服务端支持设备上线/下线/异常的事件通知, 支持自定义心跳事件, 客户端支持断线重连 - 丰富的日志打印功能,包括设备上线,下线提示, 一个协议的生命周期(请求或者请求+响应)等 - 支持本地和服务端的并发测试 #### 支持的协议 1. tcp(固定长度解码, 长度字段解码, 换行符解码,自定义分隔符解码,自定义字节到报文解码)[已完成/v2.0.0] 2. mqtt协议客户端,支持连接标准mqtt broker服务器[已完成/v2.1.0] 3. 提供modbus支持[已完成/v2.2.0] 4. 新增plc支持(待开发) #### 更新日志 1. 完成和测试通过modbus tcp客户端协议实现(2021/9/11) 2. 完成和测试通过mqtt协议的客户端实现(2021/9/7) #### 模拟工具 1. [QtSwissArmyKnife](https://gitee.com/qsaker/QtSwissArmyKnife) 支持udp、tcp、modbus、websocket、串口等调试 #### 合作方式(qq: 97235681) 1. 设备对接(不包括业务, 只负责设备协议部分,按设备协议数量收费)[500+] 2. 物联网系统开发(包括设备和业务完整系统)[价格详谈] 3. 为厂家提供专门的设备交互sdk(java)[价格详谈] ### 使用教程 首先创建一个springboot应用 #### 服务端教程 ##### 引入服务端jar包 ``` //在maven文件里面引入服务端jar包 version com.iteaj iot-server ``` ##### 创建报文解码组件 1. 创建固定长度解码器组件(以固定长度解码器为例) ``` package com.iteaj.iot.test.server.fixed; import com.iteaj.iot.AbstractProtocol; import com.iteaj.iot.config.ConnectProperties; import com.iteaj.iot.server.component.FixedLengthFrameDecoderComponent; import com.iteaj.iot.server.protocol.HeartProtocol; import com.iteaj.iot.test.TestProtocolType; /** * 固定长度解码器组件测试 * _注意:此组件必须交由spring容器管理_ * _注意:此处必须指定此组件使用的报文类型_ */ @Component public class TestFixedLengthDecoderComponent extends FixedLengthFrameDecoderComponent { // 构造函数指定要监听的端口 8080 // 指定固定长度28 public TestFixedLengthDecoderComponent() { super(new ConnectProperties(8080), 28); } @Override public String getDesc() { return "用于测试服务端固定长度字段解码器[FixedLengthFrameDecoder]"; } /** * 通过报文头获取对应的协议 */ @Override public AbstractProtocol getProtocol(FixedLengthServerMessage message) { TestProtocolType protocolType = message.getHead().getType(); // 心跳类型返回心跳协议 if(protocolType == TestProtocolType.Heart) { return HeartProtocol.getInstance(message); // 对于客户端主动请求服务器的协议类型 - new一个协议对象 } else if(protocolType == TestProtocolType.CIReq) { return new FixedLengthClientRequestProtocol(message); } else { // 对于客户端响应服务器的协议类型 - 必须移除保存在服务端的协议 return remove(message.getHead().getMessageId()); } } @Override public String getName() { return "固定长度字段解码组件"; } } ``` 2. 创建固定长度解码报文类(一个组件对应一个报文) ``` package com.iteaj.iot.test.server.fixed; import com.iteaj.iot.server.ServerMessage; import com.iteaj.iot.test.MessageCreator; /** * 固定长度报文 28 * 8 8 4 8 * 设备编号 + messageId + type + 递增值 * _注:服务端报文必须继承父类:ServerMessage _ */ public class FixedLengthServerMessage extends ServerMessage { // 注意:此构造函数一定要存在 public FixedLengthServerMessage(byte[] message) { super(message); } public FixedLengthServerMessage(MessageHead head) { super(head); } public FixedLengthServerMessage(MessageHead head, MessageBody body) { super(head, body); } // 解析出客户端请求报文的报文头 @Override protected MessageHead doBuild(byte[] message) { return MessageCreator.buildFixedMessageHead(message); } } ``` ##### 创建和设备交互的协议 ``` // 交互协议包含两种 // 1. 客户端主动请求服务端(继承 ClientInitiativeProtocol) 比如:[设备获取服务器的当前时间(请求) -> 服务器响应当前时间给客户端(响应)] package com.iteaj.iot.test.server.fixed; import com.iteaj.iot.Message; import com.iteaj.iot.ProtocolType; import com.iteaj.iot.server.manager.DevicePipelineManager; import com.iteaj.iot.server.protocol.ClientInitiativeProtocol; import com.iteaj.iot.test.MessageCreator; import com.iteaj.iot.test.TestProtocolType; public class FixedLengthClientRequestProtocol extends ClientInitiativeProtocol { public FixedLengthClientRequestProtocol(FixedLengthServerMessage requestMessage) { super(requestMessage); } // 构建要响应给客户端的报文 @Override protected FixedLengthServerMessage doBuildResponseMessage() { Message.MessageHead head = requestMessage().getHead(); String equipCode = head.getEquipCode(); String messageId = head.getMessageId(); DevicePipelineManager instance = DevicePipelineManager.getInstance(); return MessageCreator.buildFixedLengthServerMessage(equipCode, messageId, instance.size(), head.getType()); } // 解析出客户端请求的报文内容 @Override protected void doBuildRequestMessage(FixedLengthServerMessage requestMessage) { } @Override public ProtocolType protocolType() { return TestProtocolType.CIReq; } } // 2. 服务端主动请求客户端(继承 ServerInitiativeProtocol)比如:[服务端下发开指令到客户端(请求) -> 客户端响应服务端操作是否成功的状态(响应)] ``` ##### 常用的解码组件 说明:以下是已经适配好的常用的解码器组件 1. DelimiterBasedFrameDecoderComponent 自定义分隔符解码器组件 2. FixedLengthFrameDecoderComponent 固定长度解码器组件 3. LengthFieldBasedFrameDecoderComponent 长度字段解码器组件 4. LineBasedFrameDecoderComponent 换行符解码器组件 5. ByteToMessageDecoderComponent 自定义字节到报文解码器组件 #### 客户端教程 ##### 引入客户端jar包 ``` //在maven文件里面引入iot-client jar包 version com.iteaj iot-client ``` ##### 使用mqtt客户端组件 ``` //在maven文件里面引入iot-mqtt jar包 version com.iteaj iot-mqtt ``` ##### 使用modbus客户端组件 ### 2. 架构图 ![iot架构图](https://images.gitee.com/uploads/images/2021/0303/210010_da7cfaa4_1230742.png "iot.png") 1. 设备端和设备管理端同步和异步如线3、4所示 * 平台主动向设备发起请求4->3:如果是同步调用则线程将等待设备返回在往下执行业务, 如果设备由于某些原因没有返回则将在指定的超时时间内解锁线程继续向下执行业务。如果是异步调用则在设备响应后会执行业务,并不会锁住当前调用的线程 * 设备主动向平台发起请求3->4:如线3设备主动发起请求给平台, 平台将先解析设备请求的报文然后调用执行业务,执行完业务之后如线4平台将生成需要响应给设备的报文并推送给设备 2. 应用管理端和设备管理端同步和异步(1、2所示)和1相同的流程 3. 应用管理端和设备进行同步和异步调用如:1、2、3、4 * 如果是同步:由应用客户端发起调用如线1所示,发起调用的线程将阻塞, 然后如线4所示设备管理端向设备发起异步调用,接着等待设备响应如线3,再然后设备管理端响应应用客户端如线2,最后将释放应用客户端的线程锁继续向下执行 * 如果是异步:和同步的区别是应用客户端不加锁