# sparta
**Repository Path**: open-source-one/sparta
## Basic Information
- **Project Name**: sparta
- **Description**: sparta 是一个开源的,基于 springboot + redisson + redis 的一个轻量级依赖型的延迟队列,内置有丰富的API,且性能高。零配置,开箱即用,设计符合普通 MQ 队列的使用。
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 26
- **Forks**: 4
- **Created**: 2022-09-06
- **Last Updated**: 2025-09-16
## Categories & Tags
**Categories**: Uncategorized
**Tags**: delay-queue
## README
[TOC]
# Sparta
### 介绍
`Sparta` 是一款开源的轻量级延迟队列,通过 `jar` 的方法依赖进项目内,无需部署单独服务,开箱即用。
`Sparta` 作为延迟队列,支持业务服务分布式部署方式,也支持单服务的部署。`Sparta` 主要依赖于 `spring boot` 框架,根据业务可选择中间件依赖 `redis` 。
`Sparta` 能保证消息延迟在 `10ms` 以内,高性能、高可靠。
### 主要功能特性
- **消息模型:**
- 单消息延迟单消费
- 单消息延迟多消费
- 单消息单服务广播
- **低延迟:** 毫秒级延迟,性能瓶颈在对应依赖的中间件和网络
- **消息`ack` 特性 :** `自动 ack` 与 `手动 ack`
- **失败重试特性:** 消息失败后支持重试
- **消息续签特性:** 消息过期后,通过 `api` 让消息重新等待
- **最终通知:** 消息消费 N(默认20次) 次后,会触发兜底方案的运行
- **轻量级:** 只需引入依赖, 一个注解 `@EnableSparta` 即可开启 `Sparta` , 无需单独部署中间件服务
- **使用简单:**`API` 贴合当下主流 `MQ` 的使用方法 (`kafka`, `RabbitMQ` 等), 且支持 `spring boot starter`
- **支持业务服务集群部署:** 例如订单服务(依赖了 `sparta`) 集群部署
- **业务集群消息负载均衡:** 内部提供 `链表轮询` 、`随机` 两种负载均衡算法,可自行实现
### 软件架构
### 外部依赖
- `jdk8 +`
- `Maven 3.5 +`
- 可选项:`Redis 3.x +`
### 下载安装教程
- 下载 `Sparta` 源码
```shell
git clone https://gitee.com/open-source-one/sparta.git
```
- 安装到本地
```shell
mvn install
```
- 导入依赖
```xml
com.sparta
sparta-spring-boot-starter
1.0.0
```
### 快速开始
- 配置启动类,加上启动注解 `@EnableSparta`
```java
@EnableSparta
@SpringBootApplication
public class SpartaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SpartaExampleApplication.class, args);
}
}
```
- 完整配置 `application.yml`
```yml
server:
port: 9605
spring:
sparta:
system-name: sparta-example
type: redis
threads:
# 核心线程数
core-size: 8
redisson:
# redis 单机服务配置
singleServerConfig:
# redis 链接地址
address: redis://localhost:6379
# redis 数据库
database: 6
# 等待业务执行超时时间, 秒
consume-wait-time: 0S
# 错误重试次数, 业务错误重试 0-20之间,默认3
retry: 3
# 续期最大次数 0-20之间,默认3
renew: 3
# 是否启用关机数据回滚的逻辑, 默认启用 true
enable-close-rollback: true
# 是否启用广播, 默认不启用 false
enable-broadcast: true
# 是否需要开启未 ack 的持久化, 默认 false
enable-deny-persistence: true
# 是否关闭自动 ack,默认开启,false 关闭
enable-auto-ack: true
# 是否开启元数据压缩, 默认开启, false 关闭
# 启用元数据压缩,会给开发和测试阶段带来数据不直观的感受,建议在线上才开启元数据压缩
enable-metadata-compress: true
# 是否启用单机模式:false 集群模式,true 单机模式
standalone: false
## 集群配置
cluster:
# follower 向leader 心跳间隔时间
cluster-heartbeat-freq: 15S
# 集群模式下采用的网络框架,默认 netty,可选:nio/netty
cluster-transport-type: netty
```
- 基础配置 `application.yml`
```yaml
spring:
sparta:
system-name: sparta-example
redisson:
singleServerConfig:
address: redis://localhost:6379
```
#### 1. 单消息延迟单消费模型
- 定义消息体
```java
@Setter
@Getter
public class SingleEntity {
// 用户 ID
private String userId;
// 用户名
private String username;
// 密码
private String password;
// 年龄
private Integer age;
// 生日
private LocalDateTime birthday;
}
```
- 生产者
```java
@RestController
@RequestMapping("/single")
public class SingleExampleController {
@Autowired private SpartaTemplate spartaTemplate;
@PostMapping("/put")
public void put (@RequestBody SingleEntity single) {
// 向 NORMAL_TOPIC 主题中添加一个消息,在 5000ms 后消息过期
this.spartaTemplate.write("NORMAL_TOPIC", single, 5000L);
}
}
```
- 消费者
```java
@Component
public class SingleExampleListener {
// 手动 ack
@SpartaListener(topics = {"NORMAL_TOPIC"})
public void handler (SingleEntity signle, SpartaChannel channel) {
// TODO do nothing ......
channel.ack();
}
// 最终通知
@FinalRetry(topics = {TopicConstant.NORMAL_TOPIC})
public void finalRetryHandler (String topic, NormalEntity entity) {
System.out.println("兜底方案被调用 ........");
// TODO do nothing .......
}
}
```
#### 2. 单消息延迟多消费
- 消息体
```java
@Setter
@Getter
public class SegmentEntity {
private String userId;
private String username;
private String password;
}
```
- 生产者
```java
@RestController
@RequestMapping("/segment")
public class SegmentExampleController {
@Autowired private SpartaTemplate spartaTemplate;
// http://localhost:9605/segment/put?segment=PT1S,PT5S,PT10S,PT20S
// body : {"userId":1, "username":"zyred", "password":"123123"}
@PostMapping("/put")
public void put (@RequstBody SegmentEntity entity,
@RequestParam("segment") String segment) {
Set hash = new HashSet<>(Arrays.asList(segment.split(",")));
this.spartaTemplate.write("SEGMENT_TOPIC", segment, hash);
}
}
```
- 消费者
```java
@Component
public class SegmentExampleListener {
@SpartaListener(topics = {"SEGMENT_TOPIC"})
public void segmentTopicListener (SegmentEntity segmentEntity,
SpartaChannel channel) {
// TODO do nothing .......
channel.ack();
}
}
```
#### 3. 单消息单服务广播
- 打开广播功能 (默认关闭)
```yaml
spring:
sparta:
# 打开广播功能,默认 false(关闭)
enable-broadcast: true
```
- 定义广播消息体
```java
@Setter
@Getter
public class BroadcastEntity {
private String username;
}
```
- 生产者
```java
@RestController
@RequestMapping("/broadcast")
public class VersionListener {
@Autowired private SpartaTemplate spartaTemplate;
// http://localhost:9605/broadcast/put
// body: {"username":"zyred"}
@PostMapping("/put")
public void put (@RequstBody BroadcastEntity entity) {
this.spartaTemplate.writeBroadcast("BROADCAST-TOPIC", entity, 3000L);
}
}
```
- 消费者一
```java
@Component
public class BroadcastListener1 {
private static final MY_NAME = "消费者1";
@BroadcastListener(channels = {"BROADCAST-TOPIC", "B_BROADCAST"})
public void broadcastListener (BroadcastEntity entity) {
log.info("{} 消费了消息: {}", MY_NAME, entity.toString());
}
}
```
- 消费者二
```java
@Component
public class BroadcastListener2 {
private static final MY_NAME = "消费者2";
@BroadcastListener(channels = {"BROADCAST-TOPIC", "C_BROADCAST"})
public void broadcastListener (BroadcastEntity entity) {
log.info("{} 消费了消息: {}", MY_NAME, entity.toString());
}
}
```
### 消息模型
- 单消息延迟单消费模型
- 定义:一条延迟消息只会被消费一次
- 不区分集群部署还是单机部署
- 单消息延迟多消费
- 定义:一条消息设置多个延迟时间,会在相应的时间触发消息消费
- 单机部署:本条消息都将会在同一台机器上消费
- 集群部署:通过负载均衡算法,会将消息分配到不同的机器执行
- 单消息单服务广播
- 定义:一条消息会同时被两个以上消费者消费
- 集群部署:集群情况下,广播的消息只会在单个服务内被广播,不会对全局进行广播
### 注意事项
1. **消息模型与 `ack`**
- 单消息延迟单消费模型:支持 `ack`, `ack` 后消息将会被删除
- 单消息延迟多消费:支持 `ack` , 最后一段消费完毕后消息将会被删除
- 单消息单服务广播:不支持 `ack`
2. **消息模型与重试**
- 单消息延迟单消费模型:支持重试,调用业务失败后才会重试,重试最大次数 `20` ,默认 `3` 次
- 单消息延迟多消费:不支持重试
- 单消息单服务广播:不支持重试
3. **消息模型与续期**
- 单消息延迟单消费模型:支持续期
- 单消息延迟多消费:不支持续期
- 单消息单服务广播:不支持续期
4. **`ack` 模型**
- 手动 `ack` :
- 关闭自动 `ack`
```yaml
spring:
sparta:
enable-auto-ack: false
```
- 监听器的回调方法内需要加上 `SpartaChannel` 对象
```java
@SpartaListener(topics = {"NORMAL_TOPIC"})
public void handler (SingleEntity signle, SpartaChannel channel) {
// TODO do nothing ......
channel.ack();
}
```
- 自动 `ack` :
- 打开自动 `ack`
```yaml
spring:
sparta:
enable-auto-ack: true
```
- 监听器的回调方法内无需 `SpartaChannel`
```java
// 自动 ack
@SpartaListener(topics = {"NORMAL_TOPIC"})
public void handler (SingleEntity signle) {
// TODO do nothing ......
}
```
- 未 `ack` 消息的存储
- 定义:关闭了自动 `ack` 后未 `ack` 的消息将会被存储
- 适用消息模型:**仅**单消息延迟单消费模型
- 开启未 `ack` 的消息存储
```yaml
spring:
sparta:
# 默认 false
enable-deny-persistence: true
```
- `Sparta ` 提供未 `ack` 被存储的消息处理`API`
```java
public interface SpartaTemplate {
/**
* 一次调用,将会返回全量的没有 ack 的数据,并且只要调用该 api
* metadata_tab 与 deny_tab 中的数据将会全部被删除,生产环境慎用
*
* @return metadata
*/
List processDenys ();
// 其他 api 略
}
```
5. **消息续期与消息重试**
- **消息重试** :当`Sparta` 调用 `@SpartaListener` 标注的代码在调用过程中出现了异常情况,会触发重试
- 设置重试最大次数(全局)
```yaml
spring:
sparta:
# 重试最大次数设置,最大只能设置 20
retry: 3
```
- 设置下次重试时间
- **注意注意:此处不能吃掉异常,必须往外抛,跟spring事务一样,否则重试失败!**
- **注意注意:以下例子中如果发生了异常,`SingleEntity` 中被你设置任何值不会生效 !**
```java
@SpartaListener(topics = {"NORMAL_TOPIC"})
public void handler (SingleEntity signle, SpartaChannel channel) {
try{
// TODO do nothing ......
} catch(Exception ex) {
// 对 signle 实例设置任何数据都不会生效
signle.setUsername("李四");
signle.setPassword("password");
// 默认 3000ms 此处设置为 5000ms
channel.setRetryIntervalTime(5_000);
// 重中之重,不能吃掉异常,必须往外抛,跟spring 事务一样
throw ex;
}
}
```
- 集群部署情况下,为了避免单机错误的情况,采用了下次消息执行权负载到其他机器上,可能给使用 `Sparta` 的人带来的问题是不太好排查问题
- **消息续期:** 消息续期指的是在某条消息设置的过期时间太短,需要处理的业务需要一段时间。举个例子,某条数据的状态是 `0`,但是你在 `sparta` 中要处理的状态是 `1`,你的消息过期一次后,发现这个状态依然是 `0` ,那么你需要等待 从 `0` 变成 `1`,但是放入`sparta` 的消息又不能丢弃,所以需要给消息续期,你也可以理解成 `redssion` 中的看门狗机制
- 设置续期最大次数
```yaml
spring:
sparta:
# 续期最大次数 0-30 之间 默认 3 次
renew: 3
```
- 设置续期时间 (下次什么时候执行回调)
```java
@SpartaListener(topics = {"NORMAL_TOPIC"})
public void handler (SingleEntity signle, SpartaChannel channel) {
// 默认 30s
channel.setRenewIntervalTime(50_000);
}
```
- 集群部署情况下,消息执行权可能会被负载到其他机器
- 案例伪代码:
```java
// 可用的状态
private static int availableStatus = 1;
@SpartaListener(topics = {"ORDER_TOPIC"})
public void handler (Order order, SpartaChannel channel) {
if (order.getCode() == availableStatus) {
// TODO 执行成功的逻辑
channel.ack();
} else {
// TODO 订单状态不可用,需要续期等待
channel.setRenewIntervalTime(10_000);
}
}
```
### FAQ
1. 业务服务集群部署,各个服务之间需要畅通的网络吗 ?
答:需要,因为 `Sparta` 需要进行 `leader` 选举,和数据交互
2. `Redis database` 可以与业务库使用同一个吗?
答:不推荐使用同一个库,如果选择同一个库 `${spring.sparta.system-name}` 不能重复