# lightningmq **Repository Path**: hongtao945/lightningmq ## Basic Information - **Project Name**: lightningmq - **Description**: 一个用Java实现的消息队列。 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-01-23 - **Last Updated**: 2025-02-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: Java, Netty ## README # 项目简介 - **技术栈:**`JDK8 | Netty` - **项目描述:**LightningMQ 是一款分布式消息中间件,架构设计方面参考了 RocketMQ 的 Nameserver + Broker 架构,同时还封装了一套 SDK 供生产者端和消费者端使用。支持普通消息、延时消息和事务消息三种消息。 - **实现功能:** 1. 在 Nameserver 中,使用 Netty 搭建**注册中心**,供 Broker、消费者和生产者等角色接入。使用**发布-订阅模型**构建消息处理系统,基于 **SPI 机制**加载注册不同的事件监听器到**事件总线**中,后续事件总线接受到不同的消息时会将其派发到相关的事件监听器中进行处理。 2. 在 Broker 中,基于 **MMap 零拷贝技术**实现了消息的**顺序写入**并**持久化存储**,且使用 **ReentrantLock** 保证多线程写入消息时正确性。消息存储后会投递到 ConsumeQueue 中供后续接入的消费者消费, Broker 中的重平衡策略会将 ConsumeQueue 分配给消费者。 3. 在 Broker 中,收到**延迟消息**或**重试消息**时,会将其暂存到**时间轮**中。会有单独的线程每隔 1s 定时扫描时间轮,将其中到达设定延时时间的消息进行发布投递。而对于重试次数超过阈值的消息会将其投递到**死信队列**中。 4. 使用**对等架构**搭建高可用 **Nameserver 集群,**集群中各个节点间相互独立,互不联通。 Broker 需要注册到所有 Nameserver ,保证集群中某个 Nameserver 节点宕机也不会对服务造成影响。当宕机的节点重新启动后,Broker、消费者和生产者也会自动进行重新连接。 5. 使用**主从架构**搭建高可用 **Broker 集群**,Master 节点负责处理**读写操作**,Slave 节点负责**读操作**和对 Master 节点的数据进行**冗余备份**,数据同步支持**同步复制**、**半同步复制**和**异步复制**三种模式。当 Broker 的主节点宕机时,会被 Nameserver 感知并选举新的主节点。 6. 在客户端 SDK 中,会自动连接 Nameserver 并定时向其拉取当前可用的 Broker 地址,随后与 Broker 建立连接并动态创建 topic 。同时还通过自定义 **Future 接口**实现异步链路转同步的功能,并基于此封装后续的一系列读写API。 # 项目目录结构 1. `broker`和`broker-cluster`目录主要是存放配置文件、消息持久化文件、不同消费者组的消费进度等内容。 2. `LightingMQ-broker`、`LightingMQ-nameserver`就是Broker和Nameserver 3. `LightingMQ-common`是一些通用的代码 4. `LightingMQ-clinet`是客户端SDK 整个项目代码行数8000行出头: ```bash PS D:\JAVA\Java_project\LightningMQ> E:\baidu\cloc-2.02.exe . 239 text files. 225 unique files. 244 files ignored. github.com/AlDanial/cloc v 2.02 T=1.40 s (160.6 files/s, 9052.3 lines/s) ------------------------------------------------------------------------------- Language files blank comment code ------------------------------------------------------------------------------- Java 195 1648 1466 8153 XML 10 1 0 775 JSON 7 4 0 209 Maven 5 20 0 162 Markdown 2 53 0 91 Properties 6 17 25 57 ------------------------------------------------------------------------------- SUM: 225 1743 1491 9447 ------------------------------------------------------------------------------- ``` # Quick Start ## 1. 启动Nameserver ### 指定配置文件地址 该项目的配置文件放置在`broker`或`broker-cluster`文件夹下的`config`目录中,`nameserver.properties`是Nameserver的配置文件。 这里我们选用`broker-cluster`下的配置文件。 首先找到启动类`NameServerStartUp`,随后配置启动参数: ![配置启动参数](https://blog-pic-1306978076.cos.ap-guangzhou.myqcloud.com/image-20250219144400570.png?imageSlim) ![设置配置文件地址](https://blog-pic-1306978076.cos.ap-guangzhou.myqcloud.com/image-20250219142430677.png?imageSlim) 打开项目启动配置,在Environment variables填入配置文件所在目录,比如说我填入的是: > LIGHTNING_MQ_HOME=D:\JAVA\Java_project\LightningMQ\broker-cluster\master 这样的话Nameserver启动时就会自动检索`D:\JAVA\Java_project\LightningMQ\broker-cluster\master`下的`config\nameserver.properties`作为配置文件了。这里各位选择复制填入就行了: ![路径选择](https://blog-pic-1306978076.cos.ap-guangzhou.myqcloud.com/image-20250219143231179.png?imageSlim) 另外我还勾选了Allow multiple instances,这允许我们运行多个实例,从而实现集群的效果。 如果要使用集群的话,可以设置另一个实例的配置为: > LIGHTNING_MQ_HOME=D:\JAVA\Java_project\LightningMQ\broker-cluster\slave ### 编辑配置文件 下面来看看配置文件的内容 ```properties # broker、producer、consumer接入到nameserver时鉴权使用的账号密码 nameserver.user=root nameserver.password=123456 # netty服务的端口 nameserver.port=28080 ``` 仓库中的配置文件内的其它内容是多余的,因为之前搭建的是主从集群,后来觉得不适合便不再使用了。 如果要运行**多个实例**,记得`nameserver.port`**不要冲突了** ### 运行项目 如果运行成功,最后应该会打印: ```bash ... [main] INFO org.tao.lightningmq.nameserver.handler.TcpNettyServerHandler - TcpNettyServerHandler初始化 [main] INFO org.tao.lightningmq.common.event.EventBus - start init event bus [main] INFO org.tao.lightningmq.nameserver.core.NameServerNettyStarter - nameserver服务启动成功,端口号:28081 ``` 当然你也可以运行多个实例,这些实例之间是互不干扰的。 ## 2. 启动Broker ### 指定配置文件地址 Broker启动类地址为:`src/main/java/org/tao/lightningmq/broker/BrokerStartUp.java` 这一步和Nameserver差不多,就是填写Environment variables,主节点选`broker-cluster\master`,从节点1选择`broker-cluster\slave`,从节点2选择`broker-cluster\slave1`即可。 ### 编辑配置文件 ```properties # 配置nameserver集群的ip地址,用;分隔 nameserver.ip=127.0.0.1;127.0.0.1 # 配置nameserver集群的端口,用;分隔 nameserver.port=28080;28081 # 登录到nameserver所用的账号密码 nameserver.user=root nameserver.password=123456 # broker启动 broker.port=28991 broker.ip=127.0.0.1 rebalance.strategy=random # broker集群配置 broker.cluster.mode=master-slave # 当前节点是master节点,如果是slave的话就填slave broker.cluster.role=master # broker组,用于给nameserver鉴别用的 broker.cluster.group=broker_test_group ``` **只需要注意`slave`节点用`broker.cluster.role=slave`即可,如果不想使用集群就只启动Master就行了。** ### 运行项目 如果运行成功,最后应该会打印: ```bash [main] INFO org.tao.lightningmq.common.remote.NameServerNettyRemoteClient - 开始和ns建立连接 [main] INFO org.tao.lightningmq.common.remote.NameServerNettyRemoteClient - 开始和ns建立连接 [main] INFO org.tao.lightningmq.broker.netty.nameserver.NameServerClient - broker registry success [main] INFO org.tao.lightningmq.broker.netty.nameserver.NameServerClient - broker registry success [main] INFO org.tao.lightningmq.common.event.EventBus - start init event bus [main] INFO org.tao.lightningmq.broker.netty.broker.BrokerServer - start broker application on port:28991 ``` ## 3. 运行Producer和Consumer ### 运行Producer 找到`LightningMQ-client\src\test\java\org\tao\TestProducerSuite.java`,里面编写了producer的发送代码 ```java public class TestProducerSuite { private DefaultProducer producer; private static final Logger LOGGER = LoggerFactory.getLogger(TestProducerSuite.class); @Before public void setUp() { producer = new DefaultProducer(); // ns地址,有多个ns地址可以一起写入,以;分隔 producer.setNameserverAddrStr("127.0.0.1:28080;127.0.0.1:28081"); // 账号密码以及关注的broker组 producer.setNameServerUser("root"); producer.setNameServerPassword("123456"); producer.setBrokerClusterGroup("broker_test_group"); // 启动! producer.start(); } @Test public void sendUserEnterMsg() { System.out.println("sendUserEnterMsg"); for (int i = 0; i < 100; i++) { try { MessageDTO messageDTO = new MessageDTO(); // 设置topic messageDTO.setTopic("test_create_topic6"); JSONObject jsonObject = new JSONObject(); jsonObject.put("userId",i); jsonObject.put("level",1); jsonObject.put("enterTime",System.currentTimeMillis()); messageDTO.setBody(jsonObject.toJSONString().getBytes()); SendResult sendResult = producer.send(messageDTO); System.out.println("发送数据:" + jsonObject.toJSONString() + "结果:" + (sendResult.getSendStatus() == SendResult.SendStatus.SUCCESS)); TimeUnit.SECONDS.sleep(2); }catch (Exception e) { e.printStackTrace(); } } } } ``` 找到`LightningMQ-client\src\test\java\org\tao\TestConsumerSuite.java`,里面编写了Consumer的拉取代码 ```java public void testConsumeUserEnterTopicMsg() throws InterruptedException { consumer = new DefaultMqConsumer(); consumer.setNameserverAddrStr("127.0.0.1:28080;127.0.0.1:28081"); consumer.setNameServerUser("root"); consumer.setNameServerPassword("123456"); // 关注的topic consumer.setTopic("test_create_topic6"); // 设置好该消费者对应的消费者组 consumer.setConsumeGroup("test_group"); consumer.setBrokerClusterGroup("broker_test_group"); // 一次拉取的消息数 consumer.setBatchSize(4); System.out.println("start consumer"); // 消息消费的逻辑 consumer.setMessageConsumeListener(new MessageConsumeListener() { @Override public ConsumeResult consume(List consumeMessages) { for (ConsumeMessage consumeMessage : consumeMessages) { System.out.println("获取数据: " + new String(consumeMessage.getConsumeMsgCommitLogDTO().getBody())); } return ConsumeResult.success(); } }); // 启动! consumer.start(); } ```