# task-pool **Repository Path**: andnnl/task-pool ## Basic Information - **Project Name**: task-pool - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2021-06-23 - **Last Updated**: 2025-08-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # task-pool 消息线程池 #### 项目开发背景 * 每个用户的消息都要同时去处理,但用户数比较多,因服务器线程有限,不足以每个用户都能分配到一个线程 * 用户消息有优先级,后台进来的消息,优先级高的,先处理 * 消息处理失败时,需要重试 * 批量处理消息 #### 软件架构 * 架构图 ![avatar](docs/架构图.png) * 流程图 ![avatar](docs/流程图.png) #### 设计思路 * 建用户对象,里面设定一个消息队列,同一个用户的消息添加到队列后面 * 建用户队列,轮询取用户的一批消息,放到线程池去执行,如果该用户还有消息,再添加到用户队列的后面,一直循环读取 * 当线程池满消息任务时,暂停读取(用队列的take堵塞方法) * 支持动态调整线程池大小和任务队列大小 #### 使用说明 * 2级优先队列测试 ```java private static void testTwo() throws InterruptedException { //消息处理逻辑 IMsgHandle handle = (taskManage, msgList) -> { for (Msg msg : msgList) { int i = count.incrementAndGet(); // System.out.println(i + "-->" + msg.getPriority()+" "+ msg.getUser().getName() + " " + msg.getData()); } }; TaskManage tm = TaskManage.build().setHandle(handle) // .setMsgTypeEnum(MsgTypeEnum.OneQueue)//顺序处理消息 .setMsgTypeEnum(MsgTypeEnum.TwoQueue)//二级优先级用户消息,使用2个队列,用在只有2个级别的消息,消息的优先级只有0和1,1优先 // .setMsgTypeEnum(MsgTypeEnum.PriorityQueue)//优先级用户消息,大量优先级的情况下用 .setThreadSize(10)//线程池大小 .setQueueCapacity(200)//消息队列大小 .setBatchSize(1)//一次处理多少条消息 .start(); int userCount = 10; long t1 = System.currentTimeMillis(); int total = 1000000; for (int i = 0; i < total; i++) { String key = "name" + (i % userCount);//用户 Msg msg = new Msg("ctx " + i); msg.setPriority(random.nextInt(2));//随机添加0、1优先级 tm.addMsg(key, msg); } while (true) { if (count.get() == total) { long t2 = System.currentTimeMillis(); System.out.println("times:" + (t2 - t1)); break; } Thread.sleep(10); } } ``` * 重试次数 ```java public static void main(String[] args) throws IOException, InterruptedException { IMsgHandle handle = (taskManage, msgList) -> { for (Msg msg : msgList) { if (msg.getRetryTimes() >= MAX_TIMES) { int i = count.incrementAndGet(); // SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss"); // System.out.println(sdf.format(new Date())+" "+i + "-->" + msg.getPriority() + " " + msg.getUser().getName() + " " + msg.getData()); } taskManage.addRetryMsg(msg); } }; TaskManage tm = TaskManage.build().setHandle(handle) .setRetryMsgFilter(RetryFactory.build(1000,2000,3000))//3次重试 .setMsgTypeEnum(MsgTypeEnum.TwoQueue) // .setMsgTypeEnum(MsgTypeEnum.PriorityQueue) .setThreadSize(10) .setQueueCapacity(20) .setBatchSize(1) .start(); int userCount = 100; long t1 = System.currentTimeMillis(); //随机添加 int total = 100000; for (int i = 0; i < total; i++) { String key = "name" + (i % userCount); Msg msg = new Msg("ctx " + i); msg.setPriority(random.nextInt(2)); tm.addMsg(key, msg); } while (true) { if (count.get() == total) { long t2 = System.currentTimeMillis(); System.out.println("times:" + (t2 - t1)); break; } Thread.sleep(10); } // System.in.read(); } ``` * 绑定并且运行时改变线程池大小和队列大小 ```java private static void testChange() { IMsgHandle handle= (taskManage, list) -> { }; BiConsumer onChangeThreadSize = null; TaskManage tm = TaskManage.build().setHandle(handle) .setMsgTypeEnum(MsgTypeEnum.OneQueue)//顺序处理消息 // .setMsgTypeEnum(MsgTypeEnum.TwoQueue)//二级优先级用户消息,使用2个队列,用在只有2个级别的消息,消息的优先级只有0和1,1优先 // .setMsgTypeEnum(MsgTypeEnum.PriorityQueue)//优先级用户消息,大量优先级的情况下用 // .setThreadSize(10)//线程池大小 .bindThreadSize(()->c.getInt("threadSize",2)) .onChangeThreadSize((newValue,oldValue)-> System.out.println("线程数由"+oldValue+"改为"+newValue)) // .setQueueCapacity(200)//消息队列大小 .bindQueueCapacity(()->c.getInt("queueCapacity",2)) .onChangeQueueCapacity((newValue,oldValue)-> System.out.println("队列数由"+oldValue+"改为"+newValue)) .setBatchSize(5)//一次处理多少条消息 .setTest(true)//测试模式,有些计数 .start(); } ``` #### 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request #### 特技 1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md 2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) 3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) 6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)