# monkey-iot
**Repository Path**: F_monkey/monkey-iot
## Basic Information
- **Project Name**: monkey-iot
- **Description**: 物联网项目
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2026-04-03
- **Last Updated**: 2026-04-26
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Monkey IoT Platform
**高性能、可扩展的物联网平台**
基于 Spring Boot 4.0 + Netty 4.2 + Reactor 构建的现代化 IoT 平台
[](https://openjdk.org/)
[](https://spring.io/projects/spring-boot)
[](https://netty.io/)
[]()
---
## 📋 目录
- [项目简介](#项目简介)
- [核心特性](#核心特性)
- [技术栈](#技术栈)
- [架构设计](#架构设计)
- [模块说明](#模块说明)
- [支持的协议](#支持的协议)
- [快速开始](#快速开始)
- [扩展开发](#扩展开发)
- [待完善功能](#待完善功能)
- [贡献指南](#贡献指南)
---
## 🎯 项目简介
Monkey IoT 是一个高性能、高可扩展的物联网平台,采用响应式编程模型和模块化设计,支持多种设备协议的接入和管理。平台提供设备连接管理、消息编解码、状态机管理、SDK 动态加载等核心能力,适用于大规模 IoT 设备接入场景。
### 设计目标
- **高性能**: 基于 Netty 和 Reactor 的异步非阻塞架构
- **高扩展**: 插件化的协议支持和 SDK 动态加载机制
- **隔离性**: ClassLoader 隔离的第三方 SDK 执行环境
- **灵活性**: 可自定义的消息处理链和状态机逻辑
---
## ✨ 核心特性
### 1. 多协议设备接入
- ✅ **MQTT 协议**: 完整的 MQTT Connect/Publish/Subscribe 支持
- ✅ **Modbus TCP**: 工业标准 Modbus TCP 协议支持
### 2. 客户端交互
- ✅ **WebSocket 支持**: 基于 WebSocket 的实时双向通信
- ✅ **用户会话管理**: 基于 Token 的用户认证和会话管理
- ✅ **Keep-Alive**: 持久连接支持,减少连接开销
### 3. 消息编解码
- ✅ **JSON 编解码**: 标准的 JSON 消息格式
- ✅ **GZIP 压缩**: 消息压缩传输,节省带宽
- ✅ **自定义编解码**: 支持第三方 JAR 包的自定义编解码器
- ✅ **媒体类型链**: 支持编解码器组合(如: gzip → json → sdk jar)
### 4. SDK 动态加载
- ✅ **ClassLoader 隔离**: 每个 SDK 独立的类加载环境
- ✅ **热插拔**: 运行时动态加载/卸载 SDK
- ✅ **SPI 机制**: 基于 Java SPI 的服务发现
- ✅ **资源池管理**: SdkClassLoaderPool 高效管理 SDK 实例
### 5. 状态机管理
- ✅ **事件驱动**: 基于 Disruptor 的高性能事件处理
- ✅ **分组调度**: 按 Group ID 分组的并行事件处理
- ✅ **自定义状态**: 支持自定义状态机和业务逻辑
- ✅ **动态注册**: 运行时动态注册状态机组工厂
### 6. 设备管理
- ✅ **设备注册**: 支持设备唯一标识和设备信息存储
- ✅ **设备拓扑**: 支持设备父子关系和网关拓扑结构
- ✅ **设备订阅**: 客户端可订阅特定设备的消息
- ✅ **数据持久化**: 设备数据的批量保存和刷新
### 7. 消息过滤链
- ✅ **责任链模式**: 可插拔的消息过滤器链
- ✅ **设备消息过滤**: 设备上行消息的处理和转换
- ✅ **客户端消息过滤**: 客户端请求的验证和处理
- ✅ **优先级控制**: 基于 @Order 的过滤器执行顺序
---
## 🛠️ 技术栈
### 核心框架
| 技术 | 版本 | 用途 |
|------|------|------|
| Java | 25 | 编程语言 |
| Spring Boot | 4.0.1 | 应用框架 |
| Spring Cloud | 2023.0.0 | 微服务支持 |
| Project Reactor | 3.8.1 | 响应式编程 |
### 网络通信
| 技术 | 版本 | 用途 |
|------|------|------|
| Netty | 4.2.9.Final | 高性能网络框架 |
### 消息中间件
| 技术 | 版本 | 用途 |
|------|------|------------|
| RabbitMQ | 5.23.0 | 消息队列 |
| Redis | - | 缓存和会话存储,队列 |
### 数据处理
| 技术 | 版本 | 用途 |
|------|------|------|
| Jackson | 3.0.3 | JSON 序列化 |
| Protobuf | 4.33.2 | 高效序列化 |
### 工具库
| 技术 | 版本 | 用途 |
|------|------|------|
| Lombok | 1.18.42 | 代码简化 |
| Guava | 33.5.0 | Google 工具库 |
| Disruptor | 4.0.0 | 高性能队列 |
| MapStruct | 1.6.3 | 对象映射 |
| JWT (jjwt) | 0.13.0 | Token 认证 |
### 自研组件
| 组件 | 版本 | 用途 |
|------|------|------|
| monkey-transport | 1.0.1-SNAPSHOT | 传输层抽象 |
| monkey-state | 1.0.1-SNAPSHOT | 状态机引擎 |
---
## 🏗️ 架构设计
### 整体架构
```
┌─────────────────────────────────────────────────────┐
│ Client Applications │
│ (Web / Mobile / Desktop) │
└──────────────────┬──────────────────────────────────┘
│ WebSocket / HTTP
┌──────────────────▼──────────────────────────────────┐
│ Transport Server Layer │
│ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Protocol │ │ Client Connection │ │
│ │ Router │ │ Management │ │
│ └──────────────┘ └─────────────────────────┘ │
└──────────────────┬──────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌───────▼────────┐ ┌───────▼────────┐
│ Device Layer │ │ Client Layer │
│ │ │ │
│ • MQTT │ │ • WebSocket │
│ • Modbus TCP │ │ • HTTP │
│ • Extensible │ │ • Session Mgmt │
└───────┬────────┘ └────────┬───────┘
│ │
│ ┌───────────▼───────────┐
│ │ State Machine │
│ │ Engine │
│ │ (Disruptor) │
│ └───────────┬───────────┘
│ │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ Data Layer │
│ │
│ • Device Repository │
│ • User Session │
│ • Device Data │
└─────────────────────┘
```
### 核心组件
#### 1. Exchange(交换通道)
Exchange 是平台的核心抽象,代表一个双向通信通道:
- **DeviceExchange**: 设备与服务器之间的通信通道
- **ClientExchange**: 客户端与服务器之间的通信通道
每个 Exchange 包含:
- **Inbound**: 接收消息的入口
- **Outbound**: 发送消息的出口
- **Attributes**: 通道属性存储
#### 2. Filter Chain(过滤器链)
基于责任链模式的消息处理机制:
```
Client Request → Filter1 → Filter2 → ... → FilterN → State Machine
Device Message → Filter1 → Filter2 → ... → FilterN → State Machine
```
#### 3. State Machine(状态机)
基于 Group ID 的事件分发和处理:
```
Event → SchedulerManager → StateGroup → State → Business Logic
```
#### 4. SDK ClassLoader
隔离的 SDK 执行环境:
```
SdkClassLoaderPool
├── SdkClassLoader #1 (sdk-a.jar)
├── SdkClassLoader #2 (sdk-b.jar)
└── SdkClassLoader #3 (sdk-c.jar)
```
---
## 📦 模块说明
### 核心模块
| 模块 | 说明 |
|------|------|
| **iot-admin-server** | 后台管理服务,提供设备管理、用户管理等 REST API |
| **iot-transport-server** | 传输服务器,处理设备和服务器的网络连接 |
| **iot-device** | 设备端逻辑,包含协议解析、设备管理、消息编解码 |
| **iot-client** | 客户端逻辑,处理客户端连接、会话管理、状态机 |
| **iot-data** | 数据模型定义,包含 Device、UserSession、DeviceData 等实体 |
### 接口模块(SDK 扩展)
| 模块 | 说明 |
|------|------|
| **iot-interface-device** | 设备端扩展接口,用于第三方 SDK 开发 |
| **iot-interface-client** | 客户端扩展接口,用于自定义交互逻辑 |
| **iot-interface-all** | 通用接口模块,包含 ClassLoader 和编解码接口 |
| **iot-interface-exchange-codec** | 编解码器接口定义 |
### 模块依赖关系
```
iot-transport-server
├── iot-device
│ ├── iot-interface-device
│ ├── iot-interface-all
│ └── iot-data
├── iot-client
│ ├── iot-interface-client
│ ├── iot-interface-all
│ └── iot-data
└── iot-interface-exchange-codec
```
---
## 📡 支持的协议
### 设备端协议
#### 1. MQTT
- **端口**: 1883(可配置)
- **特性**:
- 设备认证(username/password)
- Connect/Disconnect 事件监听
- 主题订阅和发布
- QoS 支持
#### 2. Modbus TCP
- **端口**: 1502(可配置)
- **特性**:
- 基于 IP + UnitId 的设备识别
- 支持设备拓扑结构
- 子设备编解码器绑定
### 客户端协议
#### 1. WebSocket
- **路径**: `/ws`(可配置)
- **特性**:
- 基于 Token 的认证
- 支持协议参数选择
- Keep-Alive 持久连接
- 二进制和文本消息
**连接示例**:
```javascript
const ws = new WebSocket('ws://localhost:8080/ws?token=xxx&protocol=json');
```
#### 2. HTTP
- **特性**:
- RESTful API
- Token 认证
- JSON 请求/响应
---
## 🚀 快速开始
### 前置要求
- JDK 25+
- Gradle 9.3.1+
- Maven Local Repository(用于本地依赖)
### 构建项目
```bash
# 克隆项目
git clone
cd monkey-iot
# 构建所有模块
./gradlew build
# 跳过测试构建
./gradlew build -x test
```
### 运行传输服务器
```bash
# 进入传输服务器模块
cd iot-transport-server
# 运行应用
./gradlew bootRun
```
---
## 🔌 扩展开发
### 1. 添加新设备协议
#### 步骤 1: 创建协议处理器
```java
public class MyProtocolChannelInboundHandler
extends SimpleChannelInboundHandler {
private final ReactiveDeviceRepository deviceRepository;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) {
// 协议解析逻辑
String deviceId = extractDeviceId(msg);
deviceRepository.findById(deviceId)
.subscribe(device -> {
// 处理设备消息
ctx.fireChannelRead(decodedMessage);
});
}
}
```
#### 步骤 2: 注册协议路由
```java
@Configuration
public class MyProtocolConfiguration {
@Bean
ServerProtocolRouterFunction myProtocolRouter(
MyProtocolChannelInboundHandler handler) {
return new TcpServerProtocolRouterFunction(
port -> new MyProtocolInitializer(handler),
9000 // 协议端口
);
}
}
```
### 2. 自定义消息编解码器
#### 实现编解码器工厂
```java
public class MyDeviceMessageCodecFactory
implements DeviceMessageCodecFactory {
@Override
public ExchangeMessageCodec create(String mediaType) {
if ("my-format".equals(mediaType)) {
return new MyMessageCodec();
}
return DeviceMessageCodecFactory.NOOP;
}
}
```
#### 注册编解码器
在 `META-INF/services` 下创建文件:
```
cn.monkey.iot.exchange.codec.device.DeviceMessageCodecFactory
```
内容:
```
com.example.MyDeviceMessageCodecFactory
```
### 3. 自定义状态机逻辑
#### 创建状态机组工厂
```java
public class CustomStateGroupFactory
implements CustomerIotStateGroupFactory {
@Override
public CustomerIotStateGroup create(Object id,
Supplier timer,
Function deviceRepo,
Object... args) {
return new CustomStateGroup(id, timer, deviceRepo);
}
}
```
#### 创建状态机
```java
public class CustomStateGroup extends CustomerIotStateGroup {
public CustomStateGroup(Object id,
Supplier timer,
Function deviceRepo) {
super(id, new CustomerIotStateContext(deviceRepo), timer,
new RingBufferQueue<>(1024));
}
@Override
public void fireEvent(Supplier timer, Object event) {
if (event instanceof DeviceMessage message) {
// 处理设备消息
processDeviceMessage(message);
}
}
}
```
### 4. 添加消息过滤器
```java
@Component
@Order(100) // 执行顺序
public class LoggingDeviceMessageFilter implements DeviceMessageFilter {
private static final Logger log = LoggerFactory.getLogger(
LoggingDeviceMessageFilter.class);
@Override
public void doFilter(DeviceExchange exchange,
Object message,
FilterChain chain) {
log.info("Device {} sent message: {}", exchange.id(), message);
chain.doFilter(exchange, message);
}
}
```
### 5. 开发第三方 SDK
#### 项目结构
```
my-iot-sdk/
├── src/main/java/
│ └── com/example/
│ ├── MyStateGroupFactory.java
│ └── MyMessageCodec.java
└── META-INF/services/
├── cn.monkey.iot.sdk.client.state.CustomerIotStateGroupFactory
└── cn.monkey.iot.exchange.codec.device.DeviceMessageCodecFactory
```
#### 打包和使用
```bash
# 打包 SDK
./gradlew jar
# 客户端上传 SDK
ws.send(JSON.stringify({
cmdType: "create",
payload: "file:/path/to/my-iot-sdk.jar"
}));
```
---
## 📝 待完善功能
基于当前代码分析,以下功能需要补充和完善:
### 🔴 高优先级
#### 1. 设备拓扑结构实现
- **现状**: Modbus 协议中有 TODO 注释提到需要根据 unitId 构建设备拓扑
- **建议**:
- 实现网关-子设备的层级关系
- 支持子设备的自动发现和注册
- 为不同子设备动态绑定编解码器
#### 2. 默认状态机组工厂实现
- **现状**: `DefaultIotStateGroupFactory.create()` 方法体为空
- **建议**:
- 实现默认的状态机组创建逻辑
- 初始化 DefaultIotState 和状态上下文
- 配置事件队列和定时器
#### 3. 客户端命令完整实现
- **现状**: `DefaultIotState.onClientEvent()` 中 switch-case 不完整
- **建议**:
- 实现 `ADD_DEVICE` 命令:客户端订阅设备消息
- 实现其他必要命令(删除订阅、查询状态等)
- 添加命令响应机制(ACK/NACK)
#### 4. 错误处理和失败响应
- **现状**: 多处有 `// TODO FAIL ACK` 注释
- **建议**:
- 定义统一的错误码和错误消息格式
- 实现客户端错误响应机制
- 添加异常处理和日志记录
### 🟡 中优先级
#### 5. 设备数据持久化
- **现状**: `DeviceDataRepository.save()` 只有接口定义
- **建议**:
- 实现基于 Redis/MySQL 的数据存储
- 支持时序数据存储(如 InfluxDB)
- 实现数据聚合和统计功能
#### 6. 用户认证和授权
- **现状**: 仅有基本的 Token 验证
- **建议**:
- 集成 Spring Security
- 实现 RBAC 权限控制
- 支持 OAuth2/JWT Token 刷新
#### 7. 设备影子(Device Shadow)
- **建议**:
- 实现设备期望状态和实际状态同步
- 支持离线设备的状态缓存
- 提供状态变更通知机制
#### 8. 规则引擎
- **建议**:
- 支持基于条件的消息路由
- 实现消息转换和 enrichment
- 支持脚本引擎(JavaScript/Groovy)
#### 9. 监控和指标
- **建议**:
- 集成 Micrometer + Prometheus
- 监控设备连接数、消息吞吐量
- 实现健康检查和告警
#### 10. 消息可靠性保证
- **建议**:
- 实现消息确认机制(ACK)
- 支持消息重试和死信队列
- 实现消息去重
### 🟢 低优先级
#### 11. 更多协议支持
- CoAP 协议(适合低功耗设备)
- HTTP/2 协议
- LwM2M 协议
- OPC UA(工业自动化)
#### 12. OTA 升级
- 固件版本管理
- 增量升级支持
- 升级进度跟踪
#### 13. 多租户支持
- 租户隔离
- 资源配额管理
- 独立命名空间
#### 14. 边缘计算
- 边缘节点管理
- 本地规则执行
- 云端协同
#### 15. 可视化Dashboard
- 设备实时监控
- 数据可视化图表
- 告警面板
---
## 🤝 贡献指南
欢迎贡献代码、报告问题或提出建议!
### 开发流程
1. Fork 本仓库
2. 创建特性分支 (`git checkout -b feature/AmazingFeature`)
3. 提交更改 (`git commit -m 'Add some AmazingFeature'`)
4. 推送到分支 (`git push origin feature/AmazingFeature`)
5. 开启 Pull Request
### 代码规范
- 遵循 Java 编码规范
- 使用 Lombok 简化代码
- 添加必要的注释和文档
- 编写单元测试
### 提交规范
使用语义化提交信息:
- `feat`: 新功能
- `fix`: 修复 bug
- `docs`: 文档更新
- `style`: 代码格式调整
- `refactor`: 重构
- `test`: 测试相关
- `chore`: 构建/工具链相关
---
## 📄 License
本项目采用 SNAPSHOT 版本,具体许可证待定。
---
## 👥 联系方式
- 项目主页: [GitHub Repository](your-repo-url)
- 问题反馈: [Issues](your-issues-url)
---
**Made with ❤️ by Monkey IoT Team**
⭐ 如果这个项目对你有帮助,请给我们一个 Star!