# socket-mqtt
**Repository Path**: comcn_admin/socket-mqtt
## Basic Information
- **Project Name**: socket-mqtt
- **Description**: 基于Netty+MQTT的高性能推送服务框架。支持普通Socket、MQTT、MQTT web socket协议。非常方便接入上层业务实现推送业务。
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 16
- **Created**: 2023-12-15
- **Last Updated**: 2023-12-15
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# socket-mqtt: Netty4.x + MQTT
这是一个基于[Netty4.x](https://netty.io/) + [MQTT](http://mqtt.org/)实现的Push推送基础框架。相比于原生Netty,
socket-mqtt:
* 为C/S模式开发封装简单统一的编程模式
* 简单高性能的代码
* 统一的连接管理方案
* 统一的线程管理方案
* 网络基础问题的解决与支持:如心跳保持、压缩解压缩、编码与解码、加密与解密等
* 各种网络参数、连接池实现、监听器实现等可配置可替换
* 可实现对等集群
* 提供数据统计/监控组件
* 支持普通socket、MQTT、MQTT web socket协议
# 项目结构
* codec: 封装编码与解码
* compression: 封装压缩与解压缩
* count: 封装统计信息
* database: 基于hsql的内存数据库
* encrypt: 封装加密与解密
* future: 封装同步和异步调用
* listener: 封装事件监听,包括消息、通道、异常三类事件监听器
* service: 封装C/S模型、通道、心跳管理、消息分发等核心模块
# 压测报告
单Broker8核16G,支持44万连接;1万客户端 单消息1024B 下行tps: 16万+;
4000客户端 Publish 单消息1024B 上行tps: 17万+,千兆网卡流量基本打满。
备注:Mqtt Server启动内存只分配了5G,如果分配到10G,理论上可以支持百万连接。还有,测试开启了心跳上报。
#### 消息下行能力
1万Clients订阅的消息下行能力 |
对应下行负载情况 |
|
|
#### 消息上行能力
4000Clients订阅消息上行能力 |
对应上行负载情况 |
|
|
#### 查看连接数情况
查看连接数(telnet 10.43.204.61 8001; get status) |
查看连接数(ss -l) |
|
|
# 使用说明
各种测试类的源码在src/test/java/com/yb/socket包路径下:
包括:
* 普通socket Server/Client
* MQTT socket Server/Client
* 带注册中心的普通socket/MQTT socket
* 基于内存数据库的模拟订阅推送
## 服务启动配置选项
```java
Server server = new Server();
// 设置Broker端口
server.setPort(8000);
// 设置启动信息统计。默认true
server.setOpenCount(true);
// 设置启用心跳功能。默认true
server.setCheckHeartbeat(true);
// 设置启动服务状态,默认端口8001。通过telnet server_ip 8001; get status查看服务信息
server.setOpenStatus(true);
// 服务状态端口。默认8001
server.setStatusPort(8001);
// 设置服务名称
server.setServiceName("Demo");
// 设置工作线程数量。默认CPU个数+1
server.setWorkerCount(64);
// 是否开户业务处理线程池。默认false
server.setOpenExecutor(true);
// 设置tcp no delay。默认true
server.setTcpNoDelay(true);
// 是否启用keepAlive。默认true
server.setKeepAlive(true);
// 自定义监听器,可处理相关事件
server.addEventListener(new EchoMessageEventListener());
// 设置Broker启动协议。SocketType.MQTT - MQTT协议; SocketType.NORMAL - 普通Socket协议;SocketType.MQTT_WS - MQTT web socket协议;
server.setSocketType(SocketType.MQTT);
// 绑定端口启动服务
server.bind();
```
## MQTT web socket server DEMO
```java
Server server = new Server();
server.setPort(8000);
server.addEventListener(new EchoMessageEventListener());
server.setSocketType(SocketType.MQTT_WS);
server.bind();
//模拟推送
String message = "this is a web socket message!";
MqttRequest mqttRequest = new MqttRequest((message.getBytes()));
while (true) {
if (server.getChannels().size() > 0) {
logger.info("模拟推送消息");
for (WrappedChannel channel : server.getChannels().values()) {
server.send(channel, "yb/notice/", mqttRequest);
}
}
Thread.sleep(1000L);
}
```
## MQTT web socket client(浏览器)
```
可用在线mqtt测试:http://www.tongxinmao.com/txm/webmqtt.php
Topic Payload Time QoS
yb/notice/ this is a web socket message! 2019-2-27 16:54:54 0
```
## Normal socket server DEMO
```java
Server server = new Server();
server.setPort(8000);
server.addEventListener(new JsonEchoMessageEventListener());
server.addChannelHandler("decoder", new JsonDecoder());
server.addChannelHandler("encoder", new JsonEncoder());
server.bind();
//模拟推送
JSONObject message = new JSONObject();
message.put("action", "echo");
message.put("message", "this is a normal socket message!");
Request request = new Request();
request.setSequence(0);
request.setMessage(message);
while (true) {
if (server.getChannels().size() > 0) {
logger.info("模拟推送消息");
for (WrappedChannel channel : server.getChannels().values()) {
channel.send(request);
Thread.sleep(5000L);
}
}
}
```
## Normal socket client DEMO
```java
Client client = new Client();
client.setIp("127.0.0.1");
client.setPort(8000);
client.setConnectTimeout(10000);
client.addChannelHandler("decoder", new JsonDecoder());
client.addChannelHandler("encoder", new JsonEncoder());
client.connect();
for (int i = 0; i < 2; i++) {
JSONObject message = new JSONObject();
message.put("action", "echo");
message.put("message", "hello world!");
Request request = new Request();
request.setSequence(i);
request.setMessage(message);
Response response = (Response) client.sendWithSync(request, 3000);
logger.info("成功接收到同步的返回: '{}'.", response);
}
client.shutdown();
```
## 带注册中心 center DEMO
```java
Server server = new Server();
server.setPort(9000);
server.setCheckHeartbeat(false);
server.addChannelHandler("decoder", new JsonDecoder());
server.addChannelHandler("encoder", new JsonEncoder());
server.addEventListener(new com.yb.socket.center.CenterMockMessageEventListener());
server.bind();
```
## 带注册中心 server DEMO
```java
Server server = new Server();
server.setPort(8000);
server.setCheckHeartbeat(false);
server.setCenterAddr("127.0.0.1:9000,127.0.0.1:9010");
server.addEventListener(new JsonEchoMessageEventListener());
server.bind();
```
## 带注册中心 client DEMO
```java
Client client = new Client();
client.setCheckHeartbeat(false);
client.setCenterAddr("127.0.0.1:9000,127.0.0.1:9010");
client.addChannelHandler("decoder", new JsonDecoder());
client.addChannelHandler("encoder", new JsonEncoder());
client.connect();
JSONObject message = new JSONObject();
message.put("action", "echo");
message.put("message", "hello");
for (int i = 0; i < 5; i++) {
Request request = new Request();
request.setSequence(i);
request.setMessage(message);
client.send(request);
Thread.sleep(5000L);
}
```
# 后续规划
* 支持MQTT主题过滤机制
* 支持SSL连接方式
* 完整的QoS服务质量等级实现DEMO
* 遗嘱消息, 保留消息及消息分发重试
# 压测工具
* https://github.com/daoshenzzg/mqtt-mock
# 参考项目
* https://github.com/netty/netty
* https://github.com/1ssqq1lxr/iot_push
* https://github.com/Wizzercn/MqttWk