# message-trunk **Repository Path**: km/message-trunk ## Basic Information - **Project Name**: message-trunk - **Description**: message-trunk是以redis为基础搭建的轻量级高性能消息总线(队列),和主流MQ相比使用起来更灵巧简便。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 102 - **Created**: 2018-02-06 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # redismq 框架开发宗旨:项目内的轻量级消息队列。 框架开发目的:在项目内部,我们常常需要做异步操作,常规的做法是提交给线程池去做,这样会导致一些: * 线程池大小不可控,任务可能因为线程池满了被抛弃。 * 任务执行失败没有重试机制。 * 任务执行失败没有统一的异常处理。 为了解决如上问题,基于redis的队列开发了该消息队列,具有如下特点: * 足够轻量级,队列配置简单,只要使用redis即可,不需要额外部署环境; * 支持分布式,任务提交后由多台机器分布式处理,机器资源分配合理; * 处理效率高,任务交给多线程并发处理; * 处理有重试机制,并且可自定义错误处理。 * 对于小型数据入队列,出队列效率高。 引入依赖 ``` com.ourhours.redismq redismq 0.0.1 ``` ## DEMO ## 使用指南 ### SpringBoot引入 ``` @Configuration public class RedisMQConfig extends CachingConfigurerSupport { @Autowired Environment env; @Bean(name="messageTrunktaskExecutor") public ThreadPoolTaskExecutor getMessageTrunktaskExecutor() { ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); //线程池维护线程的最少数量 threadPool.setCorePoolSize(100); //线程池维护线程所允许的空闲时间 threadPool.setKeepAliveSeconds(30000); //线程池维护线程的最大数量 threadPool.setMaxPoolSize(1000); //线程池所使用的缓冲队列 threadPool.setQueueCapacity(1000); return threadPool; } @Bean public RedisUtil redisUtil() { //redis config 配置 JedisPoolConfig config = new JedisPoolConfig(); //控制一个pool最多有多少个状态为idle(空闲的)的JEDIS实例 try { Integer maxIdle = Integer.parseInt(env.getProperty("spring.redis.pool.max-idle")); config.setMaxIdle(maxIdle); } catch (Exception e) {} //控制一个pool最少有多少个状态为idle(空闲的)的JEDIS实例 try { Integer minIdle = Integer.parseInt(env.getProperty("spring.redis.pool.min-idle")); config.setMinIdle(minIdle); } catch (Exception e) {} //最大连接数,如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个JEDIS实例,则此时pool的状态为exhausted(耗尽)。 try { Integer maxTotal = Integer.parseInt(env.getProperty("spring.redis.pool.max-active")); config.setMaxTotal(maxTotal); } catch (Exception e) {} Integer maxWait = -1; try { maxWait = Integer.parseInt(env.getProperty("spring.redis.pool.max-wait")); config.setMaxWaitMillis(maxWait); } catch (Exception e) {} Integer timeout = 0; try { timeout = Integer.parseInt(env.getProperty("spring.redis.timeout")); } catch (Exception e) {} //sentinel节点,如果为空则为单机模式 String sentinelNodes = env.getProperty("spring.redis.sentinel.nodes"); //密码 String password = env.getProperty("spring.redis.password"); //连接池 Pool pool = null; //单机模式 if(StringUtil.isEmpty(sentinelNodes)) { String host = env.getProperty("spring.redis.host"); Integer port = Integer.parseInt(env.getProperty("spring.redis.port")); if(StringUtil.isEmpty(password)) { pool = new JedisPool(config, host, port, timeout); }else { pool = new JedisPool(config,host,port,timeout,password); } }else {//sentinal模式 String masterName = env.getProperty("spring.redis.sentinel.master"); Set sentinels = new HashSet(); sentinels.add(sentinelNodes); if(StringUtil.isEmpty(password)) { pool = new JedisSentinelPool(masterName, sentinels, config, timeout); }else { pool = new JedisSentinelPool(masterName, sentinels, config, timeout, password); } } //构建redis util RedisUtil redisUtil = new RedisUtil(pool); return redisUtil; } } ``` ### 1.消息入队列 获取消息队列全局对象MessageTrunk(可以用spring注入),put入消息即可。 ``` // 获取RedisMQMessageSender实例 @Autowired RedisMQMessageSender messageSender; //发送消息 String msg = "发送的消息"; messageSender.put(new Message("QUEUE_NAME", msg)); ``` ### 2. 处理消息 消息处理器:继承AbstarctMessageHandler抽象类。 ``` @Service public class DemoHandler extends AbstarctMessageHandler { private static Log logger = LogFactory.getLog(DemoHandler.class); public DemoHandler() { // 说明该handler监控的消息类型,第一个参数是队列名称,第二个为重试次数 super("QUEUE_NAME", 10); } /** * 监听到消息后处理方法 */ @Override public void handle(String message) { System.out.println("正在处理消息:" + message); } @Override public void handleFailed(String obj) { StringBuilder sb = new StringBuilder(); sb.append("msg:[").append(obj).append("], 超过失败次数,停止重试。"); logger.warn(sb.toString()); } } ``` ## 实现原理 基本原理是redis的阻塞取命令: Blpop,该命令移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。