# sparta **Repository Path**: open-source-one/sparta ## Basic Information - **Project Name**: sparta - **Description**: sparta 是一个开源的,基于 springboot + redisson + redis 的一个轻量级依赖型的延迟队列,内置有丰富的API,且性能高。零配置,开箱即用,设计符合普通 MQ 队列的使用。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 26 - **Forks**: 4 - **Created**: 2022-09-06 - **Last Updated**: 2025-09-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: delay-queue ## README [TOC] # Sparta ### 介绍 `Sparta` 是一款开源的轻量级延迟队列,通过 `jar` 的方法依赖进项目内,无需部署单独服务,开箱即用。 `Sparta` 作为延迟队列,支持业务服务分布式部署方式,也支持单服务的部署。`Sparta` 主要依赖于 `spring boot` 框架,根据业务可选择中间件依赖 `redis` 。 `Sparta` 能保证消息延迟在 `10ms` 以内,高性能、高可靠。 ### 主要功能特性 - **消息模型:** - 单消息延迟单消费 - 单消息延迟多消费 - 单消息单服务广播 - **低延迟:** 毫秒级延迟,性能瓶颈在对应依赖的中间件和网络 - **消息`ack` 特性 :** `自动 ack` 与 `手动 ack` - **失败重试特性:** 消息失败后支持重试 - **消息续签特性:** 消息过期后,通过 `api` 让消息重新等待 - **最终通知:** 消息消费 N(默认20次) 次后,会触发兜底方案的运行 - **轻量级:** 只需引入依赖, 一个注解 `@EnableSparta` 即可开启 `Sparta` , 无需单独部署中间件服务 - **使用简单:**`API` 贴合当下主流 `MQ` 的使用方法 (`kafka`, `RabbitMQ` 等), 且支持 `spring boot starter` - **支持业务服务集群部署:** 例如订单服务(依赖了 `sparta`) 集群部署 - **业务集群消息负载均衡:** 内部提供 `链表轮询` 、`随机` 两种负载均衡算法,可自行实现 ### 软件架构 ### 外部依赖 - `jdk8 +` - `Maven 3.5 +` - 可选项:`Redis 3.x +` ### 下载安装教程 - 下载 `Sparta` 源码 ```shell git clone https://gitee.com/open-source-one/sparta.git ``` - 安装到本地 ```shell mvn install ``` - 导入依赖 ```xml com.sparta sparta-spring-boot-starter 1.0.0 ``` ### 快速开始 - 配置启动类,加上启动注解 `@EnableSparta` ```java @EnableSparta @SpringBootApplication public class SpartaExampleApplication { public static void main(String[] args) { SpringApplication.run(SpartaExampleApplication.class, args); } } ``` - 完整配置 `application.yml` ```yml server: port: 9605 spring: sparta: system-name: sparta-example type: redis threads: # 核心线程数 core-size: 8 redisson: # redis 单机服务配置 singleServerConfig: # redis 链接地址 address: redis://localhost:6379 # redis 数据库 database: 6 # 等待业务执行超时时间, 秒 consume-wait-time: 0S # 错误重试次数, 业务错误重试 0-20之间,默认3 retry: 3 # 续期最大次数 0-20之间,默认3 renew: 3 # 是否启用关机数据回滚的逻辑, 默认启用 true enable-close-rollback: true # 是否启用广播, 默认不启用 false enable-broadcast: true # 是否需要开启未 ack 的持久化, 默认 false enable-deny-persistence: true # 是否关闭自动 ack,默认开启,false 关闭 enable-auto-ack: true # 是否开启元数据压缩, 默认开启, false 关闭 # 启用元数据压缩,会给开发和测试阶段带来数据不直观的感受,建议在线上才开启元数据压缩 enable-metadata-compress: true # 是否启用单机模式:false 集群模式,true 单机模式 standalone: false ## 集群配置 cluster: # follower 向leader 心跳间隔时间 cluster-heartbeat-freq: 15S # 集群模式下采用的网络框架,默认 netty,可选:nio/netty cluster-transport-type: netty ``` - 基础配置 `application.yml` ```yaml spring: sparta: system-name: sparta-example redisson: singleServerConfig: address: redis://localhost:6379 ``` #### 1. 单消息延迟单消费模型 - 定义消息体 ```java @Setter @Getter public class SingleEntity { // 用户 ID private String userId; // 用户名 private String username; // 密码 private String password; // 年龄 private Integer age; // 生日 private LocalDateTime birthday; } ``` - 生产者 ```java @RestController @RequestMapping("/single") public class SingleExampleController { @Autowired private SpartaTemplate spartaTemplate; @PostMapping("/put") public void put (@RequestBody SingleEntity single) { // 向 NORMAL_TOPIC 主题中添加一个消息,在 5000ms 后消息过期 this.spartaTemplate.write("NORMAL_TOPIC", single, 5000L); } } ``` - 消费者 ```java @Component public class SingleExampleListener { // 手动 ack @SpartaListener(topics = {"NORMAL_TOPIC"}) public void handler (SingleEntity signle, SpartaChannel channel) { // TODO do nothing ...... channel.ack(); } // 最终通知 @FinalRetry(topics = {TopicConstant.NORMAL_TOPIC}) public void finalRetryHandler (String topic, NormalEntity entity) { System.out.println("兜底方案被调用 ........"); // TODO do nothing ....... } } ``` #### 2. 单消息延迟多消费 - 消息体 ```java @Setter @Getter public class SegmentEntity { private String userId; private String username; private String password; } ``` - 生产者 ```java @RestController @RequestMapping("/segment") public class SegmentExampleController { @Autowired private SpartaTemplate spartaTemplate; // http://localhost:9605/segment/put?segment=PT1S,PT5S,PT10S,PT20S // body : {"userId":1, "username":"zyred", "password":"123123"} @PostMapping("/put") public void put (@RequstBody SegmentEntity entity, @RequestParam("segment") String segment) { Set hash = new HashSet<>(Arrays.asList(segment.split(","))); this.spartaTemplate.write("SEGMENT_TOPIC", segment, hash); } } ``` - 消费者 ```java @Component public class SegmentExampleListener { @SpartaListener(topics = {"SEGMENT_TOPIC"}) public void segmentTopicListener (SegmentEntity segmentEntity, SpartaChannel channel) { // TODO do nothing ....... channel.ack(); } } ``` #### 3. 单消息单服务广播 - 打开广播功能 (默认关闭) ```yaml spring: sparta: # 打开广播功能,默认 false(关闭) enable-broadcast: true ``` - 定义广播消息体 ```java @Setter @Getter public class BroadcastEntity { private String username; } ``` - 生产者 ```java @RestController @RequestMapping("/broadcast") public class VersionListener { @Autowired private SpartaTemplate spartaTemplate; // http://localhost:9605/broadcast/put // body: {"username":"zyred"} @PostMapping("/put") public void put (@RequstBody BroadcastEntity entity) { this.spartaTemplate.writeBroadcast("BROADCAST-TOPIC", entity, 3000L); } } ``` - 消费者一 ```java @Component public class BroadcastListener1 { private static final MY_NAME = "消费者1"; @BroadcastListener(channels = {"BROADCAST-TOPIC", "B_BROADCAST"}) public void broadcastListener (BroadcastEntity entity) { log.info("{} 消费了消息: {}", MY_NAME, entity.toString()); } } ``` - 消费者二 ```java @Component public class BroadcastListener2 { private static final MY_NAME = "消费者2"; @BroadcastListener(channels = {"BROADCAST-TOPIC", "C_BROADCAST"}) public void broadcastListener (BroadcastEntity entity) { log.info("{} 消费了消息: {}", MY_NAME, entity.toString()); } } ``` ### 消息模型 - 单消息延迟单消费模型 - 定义:一条延迟消息只会被消费一次 - 不区分集群部署还是单机部署 - 单消息延迟多消费 - 定义:一条消息设置多个延迟时间,会在相应的时间触发消息消费 - 单机部署:本条消息都将会在同一台机器上消费 - 集群部署:通过负载均衡算法,会将消息分配到不同的机器执行 - 单消息单服务广播 - 定义:一条消息会同时被两个以上消费者消费 - 集群部署:集群情况下,广播的消息只会在单个服务内被广播,不会对全局进行广播 ### 注意事项 1. **消息模型与 `ack`** - 单消息延迟单消费模型:支持 `ack`, `ack` 后消息将会被删除 - 单消息延迟多消费:支持 `ack` , 最后一段消费完毕后消息将会被删除 - 单消息单服务广播:不支持 `ack` 2. **消息模型与重试** - 单消息延迟单消费模型:支持重试,调用业务失败后才会重试,重试最大次数 `20` ,默认 `3` 次 - 单消息延迟多消费:不支持重试 - 单消息单服务广播:不支持重试 3. **消息模型与续期** - 单消息延迟单消费模型:支持续期 - 单消息延迟多消费:不支持续期 - 单消息单服务广播:不支持续期 4. **`ack` 模型** - 手动 `ack` : - 关闭自动 `ack` ```yaml spring: sparta: enable-auto-ack: false ``` - 监听器的回调方法内需要加上 `SpartaChannel` 对象 ```java @SpartaListener(topics = {"NORMAL_TOPIC"}) public void handler (SingleEntity signle, SpartaChannel channel) { // TODO do nothing ...... channel.ack(); } ``` - 自动 `ack` : - 打开自动 `ack` ```yaml spring: sparta: enable-auto-ack: true ``` - 监听器的回调方法内无需 `SpartaChannel` ```java // 自动 ack @SpartaListener(topics = {"NORMAL_TOPIC"}) public void handler (SingleEntity signle) { // TODO do nothing ...... } ``` - 未 `ack` 消息的存储 - 定义:关闭了自动 `ack` 后未 `ack` 的消息将会被存储 - 适用消息模型:**仅**单消息延迟单消费模型 - 开启未 `ack` 的消息存储 ```yaml spring: sparta: # 默认 false enable-deny-persistence: true ``` - `Sparta ` 提供未 `ack` 被存储的消息处理`API` ```java public interface SpartaTemplate { /** * 一次调用,将会返回全量的没有 ack 的数据,并且只要调用该 api * metadata_tab 与 deny_tab 中的数据将会全部被删除,生产环境慎用 * * @return metadata */ List processDenys (); // 其他 api 略 } ``` 5. **消息续期与消息重试** - **消息重试** :当`Sparta` 调用 `@SpartaListener` 标注的代码在调用过程中出现了异常情况,会触发重试 - 设置重试最大次数(全局) ```yaml spring: sparta: # 重试最大次数设置,最大只能设置 20 retry: 3 ``` - 设置下次重试时间 - **注意注意:此处不能吃掉异常,必须往外抛,跟spring事务一样,否则重试失败!** - **注意注意:以下例子中如果发生了异常,`SingleEntity` 中被你设置任何值不会生效 !** ```java @SpartaListener(topics = {"NORMAL_TOPIC"}) public void handler (SingleEntity signle, SpartaChannel channel) { try{ // TODO do nothing ...... } catch(Exception ex) { // 对 signle 实例设置任何数据都不会生效 signle.setUsername("李四"); signle.setPassword("password"); // 默认 3000ms 此处设置为 5000ms channel.setRetryIntervalTime(5_000); // 重中之重,不能吃掉异常,必须往外抛,跟spring 事务一样 throw ex; } } ``` - 集群部署情况下,为了避免单机错误的情况,采用了下次消息执行权负载到其他机器上,可能给使用 `Sparta` 的人带来的问题是不太好排查问题 - **消息续期:** 消息续期指的是在某条消息设置的过期时间太短,需要处理的业务需要一段时间。举个例子,某条数据的状态是 `0`,但是你在 `sparta` 中要处理的状态是 `1`,你的消息过期一次后,发现这个状态依然是 `0` ,那么你需要等待 从 `0` 变成 `1`,但是放入`sparta` 的消息又不能丢弃,所以需要给消息续期,你也可以理解成 `redssion` 中的看门狗机制 - 设置续期最大次数 ```yaml spring: sparta: # 续期最大次数 0-30 之间 默认 3 次 renew: 3 ``` - 设置续期时间 (下次什么时候执行回调) ```java @SpartaListener(topics = {"NORMAL_TOPIC"}) public void handler (SingleEntity signle, SpartaChannel channel) { // 默认 30s channel.setRenewIntervalTime(50_000); } ``` - 集群部署情况下,消息执行权可能会被负载到其他机器 - 案例伪代码: ```java // 可用的状态 private static int availableStatus = 1; @SpartaListener(topics = {"ORDER_TOPIC"}) public void handler (Order order, SpartaChannel channel) { if (order.getCode() == availableStatus) { // TODO 执行成功的逻辑 channel.ack(); } else { // TODO 订单状态不可用,需要续期等待 channel.setRenewIntervalTime(10_000); } } ``` ### FAQ 1. 业务服务集群部署,各个服务之间需要畅通的网络吗 ? 答:需要,因为 `Sparta` 需要进行 `leader` 选举,和数据交互 2. `Redis database` 可以与业务库使用同一个吗? 答:不推荐使用同一个库,如果选择同一个库 `${spring.sparta.system-name}` 不能重复