# monkey-iot **Repository Path**: F_monkey/monkey-iot ## Basic Information - **Project Name**: monkey-iot - **Description**: 物联网项目 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-04-03 - **Last Updated**: 2026-04-26 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Monkey IoT Platform
**高性能、可扩展的物联网平台** 基于 Spring Boot 4.0 + Netty 4.2 + Reactor 构建的现代化 IoT 平台 [![Java](https://img.shields.io/badge/Java-25-orange.svg)](https://openjdk.org/) [![Spring Boot](https://img.shields.io/badge/Spring%20Boot-4.0.1-brightgreen.svg)](https://spring.io/projects/spring-boot) [![Netty](https://img.shields.io/badge/Netty-4.2.9-blue.svg)](https://netty.io/) [![License](https://img.shields.io/badge/License-SNAPSHOT-yellow.svg)]()
--- ## 📋 目录 - [项目简介](#项目简介) - [核心特性](#核心特性) - [技术栈](#技术栈) - [架构设计](#架构设计) - [模块说明](#模块说明) - [支持的协议](#支持的协议) - [快速开始](#快速开始) - [扩展开发](#扩展开发) - [待完善功能](#待完善功能) - [贡献指南](#贡献指南) --- ## 🎯 项目简介 Monkey IoT 是一个高性能、高可扩展的物联网平台,采用响应式编程模型和模块化设计,支持多种设备协议的接入和管理。平台提供设备连接管理、消息编解码、状态机管理、SDK 动态加载等核心能力,适用于大规模 IoT 设备接入场景。 ### 设计目标 - **高性能**: 基于 Netty 和 Reactor 的异步非阻塞架构 - **高扩展**: 插件化的协议支持和 SDK 动态加载机制 - **隔离性**: ClassLoader 隔离的第三方 SDK 执行环境 - **灵活性**: 可自定义的消息处理链和状态机逻辑 --- ## ✨ 核心特性 ### 1. 多协议设备接入 - ✅ **MQTT 协议**: 完整的 MQTT Connect/Publish/Subscribe 支持 - ✅ **Modbus TCP**: 工业标准 Modbus TCP 协议支持 ### 2. 客户端交互 - ✅ **WebSocket 支持**: 基于 WebSocket 的实时双向通信 - ✅ **用户会话管理**: 基于 Token 的用户认证和会话管理 - ✅ **Keep-Alive**: 持久连接支持,减少连接开销 ### 3. 消息编解码 - ✅ **JSON 编解码**: 标准的 JSON 消息格式 - ✅ **GZIP 压缩**: 消息压缩传输,节省带宽 - ✅ **自定义编解码**: 支持第三方 JAR 包的自定义编解码器 - ✅ **媒体类型链**: 支持编解码器组合(如: gzip → json → sdk jar) ### 4. SDK 动态加载 - ✅ **ClassLoader 隔离**: 每个 SDK 独立的类加载环境 - ✅ **热插拔**: 运行时动态加载/卸载 SDK - ✅ **SPI 机制**: 基于 Java SPI 的服务发现 - ✅ **资源池管理**: SdkClassLoaderPool 高效管理 SDK 实例 ### 5. 状态机管理 - ✅ **事件驱动**: 基于 Disruptor 的高性能事件处理 - ✅ **分组调度**: 按 Group ID 分组的并行事件处理 - ✅ **自定义状态**: 支持自定义状态机和业务逻辑 - ✅ **动态注册**: 运行时动态注册状态机组工厂 ### 6. 设备管理 - ✅ **设备注册**: 支持设备唯一标识和设备信息存储 - ✅ **设备拓扑**: 支持设备父子关系和网关拓扑结构 - ✅ **设备订阅**: 客户端可订阅特定设备的消息 - ✅ **数据持久化**: 设备数据的批量保存和刷新 ### 7. 消息过滤链 - ✅ **责任链模式**: 可插拔的消息过滤器链 - ✅ **设备消息过滤**: 设备上行消息的处理和转换 - ✅ **客户端消息过滤**: 客户端请求的验证和处理 - ✅ **优先级控制**: 基于 @Order 的过滤器执行顺序 --- ## 🛠️ 技术栈 ### 核心框架 | 技术 | 版本 | 用途 | |------|------|------| | Java | 25 | 编程语言 | | Spring Boot | 4.0.1 | 应用框架 | | Spring Cloud | 2023.0.0 | 微服务支持 | | Project Reactor | 3.8.1 | 响应式编程 | ### 网络通信 | 技术 | 版本 | 用途 | |------|------|------| | Netty | 4.2.9.Final | 高性能网络框架 | ### 消息中间件 | 技术 | 版本 | 用途 | |------|------|------------| | RabbitMQ | 5.23.0 | 消息队列 | | Redis | - | 缓存和会话存储,队列 | ### 数据处理 | 技术 | 版本 | 用途 | |------|------|------| | Jackson | 3.0.3 | JSON 序列化 | | Protobuf | 4.33.2 | 高效序列化 | ### 工具库 | 技术 | 版本 | 用途 | |------|------|------| | Lombok | 1.18.42 | 代码简化 | | Guava | 33.5.0 | Google 工具库 | | Disruptor | 4.0.0 | 高性能队列 | | MapStruct | 1.6.3 | 对象映射 | | JWT (jjwt) | 0.13.0 | Token 认证 | ### 自研组件 | 组件 | 版本 | 用途 | |------|------|------| | monkey-transport | 1.0.1-SNAPSHOT | 传输层抽象 | | monkey-state | 1.0.1-SNAPSHOT | 状态机引擎 | --- ## 🏗️ 架构设计 ### 整体架构 ``` ┌─────────────────────────────────────────────────────┐ │ Client Applications │ │ (Web / Mobile / Desktop) │ └──────────────────┬──────────────────────────────────┘ │ WebSocket / HTTP ┌──────────────────▼──────────────────────────────────┐ │ Transport Server Layer │ │ ┌──────────────┐ ┌─────────────────────────┐ │ │ │ Protocol │ │ Client Connection │ │ │ │ Router │ │ Management │ │ │ └──────────────┘ └─────────────────────────┘ │ └──────────────────┬──────────────────────────────────┘ │ ┌──────────┴──────────┐ │ │ ┌───────▼────────┐ ┌───────▼────────┐ │ Device Layer │ │ Client Layer │ │ │ │ │ │ • MQTT │ │ • WebSocket │ │ • Modbus TCP │ │ • HTTP │ │ • Extensible │ │ • Session Mgmt │ └───────┬────────┘ └────────┬───────┘ │ │ │ ┌───────────▼───────────┐ │ │ State Machine │ │ │ Engine │ │ │ (Disruptor) │ │ └───────────┬───────────┘ │ │ └──────────┬──────────┘ │ ┌──────────▼──────────┐ │ Data Layer │ │ │ │ • Device Repository │ │ • User Session │ │ • Device Data │ └─────────────────────┘ ``` ### 核心组件 #### 1. Exchange(交换通道) Exchange 是平台的核心抽象,代表一个双向通信通道: - **DeviceExchange**: 设备与服务器之间的通信通道 - **ClientExchange**: 客户端与服务器之间的通信通道 每个 Exchange 包含: - **Inbound**: 接收消息的入口 - **Outbound**: 发送消息的出口 - **Attributes**: 通道属性存储 #### 2. Filter Chain(过滤器链) 基于责任链模式的消息处理机制: ``` Client Request → Filter1 → Filter2 → ... → FilterN → State Machine Device Message → Filter1 → Filter2 → ... → FilterN → State Machine ``` #### 3. State Machine(状态机) 基于 Group ID 的事件分发和处理: ``` Event → SchedulerManager → StateGroup → State → Business Logic ``` #### 4. SDK ClassLoader 隔离的 SDK 执行环境: ``` SdkClassLoaderPool ├── SdkClassLoader #1 (sdk-a.jar) ├── SdkClassLoader #2 (sdk-b.jar) └── SdkClassLoader #3 (sdk-c.jar) ``` --- ## 📦 模块说明 ### 核心模块 | 模块 | 说明 | |------|------| | **iot-admin-server** | 后台管理服务,提供设备管理、用户管理等 REST API | | **iot-transport-server** | 传输服务器,处理设备和服务器的网络连接 | | **iot-device** | 设备端逻辑,包含协议解析、设备管理、消息编解码 | | **iot-client** | 客户端逻辑,处理客户端连接、会话管理、状态机 | | **iot-data** | 数据模型定义,包含 Device、UserSession、DeviceData 等实体 | ### 接口模块(SDK 扩展) | 模块 | 说明 | |------|------| | **iot-interface-device** | 设备端扩展接口,用于第三方 SDK 开发 | | **iot-interface-client** | 客户端扩展接口,用于自定义交互逻辑 | | **iot-interface-all** | 通用接口模块,包含 ClassLoader 和编解码接口 | | **iot-interface-exchange-codec** | 编解码器接口定义 | ### 模块依赖关系 ``` iot-transport-server ├── iot-device │ ├── iot-interface-device │ ├── iot-interface-all │ └── iot-data ├── iot-client │ ├── iot-interface-client │ ├── iot-interface-all │ └── iot-data └── iot-interface-exchange-codec ``` --- ## 📡 支持的协议 ### 设备端协议 #### 1. MQTT - **端口**: 1883(可配置) - **特性**: - 设备认证(username/password) - Connect/Disconnect 事件监听 - 主题订阅和发布 - QoS 支持 #### 2. Modbus TCP - **端口**: 1502(可配置) - **特性**: - 基于 IP + UnitId 的设备识别 - 支持设备拓扑结构 - 子设备编解码器绑定 ### 客户端协议 #### 1. WebSocket - **路径**: `/ws`(可配置) - **特性**: - 基于 Token 的认证 - 支持协议参数选择 - Keep-Alive 持久连接 - 二进制和文本消息 **连接示例**: ```javascript const ws = new WebSocket('ws://localhost:8080/ws?token=xxx&protocol=json'); ``` #### 2. HTTP - **特性**: - RESTful API - Token 认证 - JSON 请求/响应 --- ## 🚀 快速开始 ### 前置要求 - JDK 25+ - Gradle 9.3.1+ - Maven Local Repository(用于本地依赖) ### 构建项目 ```bash # 克隆项目 git clone cd monkey-iot # 构建所有模块 ./gradlew build # 跳过测试构建 ./gradlew build -x test ``` ### 运行传输服务器 ```bash # 进入传输服务器模块 cd iot-transport-server # 运行应用 ./gradlew bootRun ``` --- ## 🔌 扩展开发 ### 1. 添加新设备协议 #### 步骤 1: 创建协议处理器 ```java public class MyProtocolChannelInboundHandler extends SimpleChannelInboundHandler { private final ReactiveDeviceRepository deviceRepository; @Override protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) { // 协议解析逻辑 String deviceId = extractDeviceId(msg); deviceRepository.findById(deviceId) .subscribe(device -> { // 处理设备消息 ctx.fireChannelRead(decodedMessage); }); } } ``` #### 步骤 2: 注册协议路由 ```java @Configuration public class MyProtocolConfiguration { @Bean ServerProtocolRouterFunction myProtocolRouter( MyProtocolChannelInboundHandler handler) { return new TcpServerProtocolRouterFunction( port -> new MyProtocolInitializer(handler), 9000 // 协议端口 ); } } ``` ### 2. 自定义消息编解码器 #### 实现编解码器工厂 ```java public class MyDeviceMessageCodecFactory implements DeviceMessageCodecFactory { @Override public ExchangeMessageCodec create(String mediaType) { if ("my-format".equals(mediaType)) { return new MyMessageCodec(); } return DeviceMessageCodecFactory.NOOP; } } ``` #### 注册编解码器 在 `META-INF/services` 下创建文件: ``` cn.monkey.iot.exchange.codec.device.DeviceMessageCodecFactory ``` 内容: ``` com.example.MyDeviceMessageCodecFactory ``` ### 3. 自定义状态机逻辑 #### 创建状态机组工厂 ```java public class CustomStateGroupFactory implements CustomerIotStateGroupFactory { @Override public CustomerIotStateGroup create(Object id, Supplier timer, Function deviceRepo, Object... args) { return new CustomStateGroup(id, timer, deviceRepo); } } ``` #### 创建状态机 ```java public class CustomStateGroup extends CustomerIotStateGroup { public CustomStateGroup(Object id, Supplier timer, Function deviceRepo) { super(id, new CustomerIotStateContext(deviceRepo), timer, new RingBufferQueue<>(1024)); } @Override public void fireEvent(Supplier timer, Object event) { if (event instanceof DeviceMessage message) { // 处理设备消息 processDeviceMessage(message); } } } ``` ### 4. 添加消息过滤器 ```java @Component @Order(100) // 执行顺序 public class LoggingDeviceMessageFilter implements DeviceMessageFilter { private static final Logger log = LoggerFactory.getLogger( LoggingDeviceMessageFilter.class); @Override public void doFilter(DeviceExchange exchange, Object message, FilterChain chain) { log.info("Device {} sent message: {}", exchange.id(), message); chain.doFilter(exchange, message); } } ``` ### 5. 开发第三方 SDK #### 项目结构 ``` my-iot-sdk/ ├── src/main/java/ │ └── com/example/ │ ├── MyStateGroupFactory.java │ └── MyMessageCodec.java └── META-INF/services/ ├── cn.monkey.iot.sdk.client.state.CustomerIotStateGroupFactory └── cn.monkey.iot.exchange.codec.device.DeviceMessageCodecFactory ``` #### 打包和使用 ```bash # 打包 SDK ./gradlew jar # 客户端上传 SDK ws.send(JSON.stringify({ cmdType: "create", payload: "file:/path/to/my-iot-sdk.jar" })); ``` --- ## 📝 待完善功能 基于当前代码分析,以下功能需要补充和完善: ### 🔴 高优先级 #### 1. 设备拓扑结构实现 - **现状**: Modbus 协议中有 TODO 注释提到需要根据 unitId 构建设备拓扑 - **建议**: - 实现网关-子设备的层级关系 - 支持子设备的自动发现和注册 - 为不同子设备动态绑定编解码器 #### 2. 默认状态机组工厂实现 - **现状**: `DefaultIotStateGroupFactory.create()` 方法体为空 - **建议**: - 实现默认的状态机组创建逻辑 - 初始化 DefaultIotState 和状态上下文 - 配置事件队列和定时器 #### 3. 客户端命令完整实现 - **现状**: `DefaultIotState.onClientEvent()` 中 switch-case 不完整 - **建议**: - 实现 `ADD_DEVICE` 命令:客户端订阅设备消息 - 实现其他必要命令(删除订阅、查询状态等) - 添加命令响应机制(ACK/NACK) #### 4. 错误处理和失败响应 - **现状**: 多处有 `// TODO FAIL ACK` 注释 - **建议**: - 定义统一的错误码和错误消息格式 - 实现客户端错误响应机制 - 添加异常处理和日志记录 ### 🟡 中优先级 #### 5. 设备数据持久化 - **现状**: `DeviceDataRepository.save()` 只有接口定义 - **建议**: - 实现基于 Redis/MySQL 的数据存储 - 支持时序数据存储(如 InfluxDB) - 实现数据聚合和统计功能 #### 6. 用户认证和授权 - **现状**: 仅有基本的 Token 验证 - **建议**: - 集成 Spring Security - 实现 RBAC 权限控制 - 支持 OAuth2/JWT Token 刷新 #### 7. 设备影子(Device Shadow) - **建议**: - 实现设备期望状态和实际状态同步 - 支持离线设备的状态缓存 - 提供状态变更通知机制 #### 8. 规则引擎 - **建议**: - 支持基于条件的消息路由 - 实现消息转换和 enrichment - 支持脚本引擎(JavaScript/Groovy) #### 9. 监控和指标 - **建议**: - 集成 Micrometer + Prometheus - 监控设备连接数、消息吞吐量 - 实现健康检查和告警 #### 10. 消息可靠性保证 - **建议**: - 实现消息确认机制(ACK) - 支持消息重试和死信队列 - 实现消息去重 ### 🟢 低优先级 #### 11. 更多协议支持 - CoAP 协议(适合低功耗设备) - HTTP/2 协议 - LwM2M 协议 - OPC UA(工业自动化) #### 12. OTA 升级 - 固件版本管理 - 增量升级支持 - 升级进度跟踪 #### 13. 多租户支持 - 租户隔离 - 资源配额管理 - 独立命名空间 #### 14. 边缘计算 - 边缘节点管理 - 本地规则执行 - 云端协同 #### 15. 可视化Dashboard - 设备实时监控 - 数据可视化图表 - 告警面板 --- ## 🤝 贡献指南 欢迎贡献代码、报告问题或提出建议! ### 开发流程 1. Fork 本仓库 2. 创建特性分支 (`git checkout -b feature/AmazingFeature`) 3. 提交更改 (`git commit -m 'Add some AmazingFeature'`) 4. 推送到分支 (`git push origin feature/AmazingFeature`) 5. 开启 Pull Request ### 代码规范 - 遵循 Java 编码规范 - 使用 Lombok 简化代码 - 添加必要的注释和文档 - 编写单元测试 ### 提交规范 使用语义化提交信息: - `feat`: 新功能 - `fix`: 修复 bug - `docs`: 文档更新 - `style`: 代码格式调整 - `refactor`: 重构 - `test`: 测试相关 - `chore`: 构建/工具链相关 --- ## 📄 License 本项目采用 SNAPSHOT 版本,具体许可证待定。 --- ## 👥 联系方式 - 项目主页: [GitHub Repository](your-repo-url) - 问题反馈: [Issues](your-issues-url) ---
**Made with ❤️ by Monkey IoT Team** ⭐ 如果这个项目对你有帮助,请给我们一个 Star!