# rocketMq-demo **Repository Path**: iispeily/rocketMq-demo ## Basic Information - **Project Name**: rocketMq-demo - **Description**: SpringMVC + RocketMq 演示,事物最终一致demo - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2019-07-09 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## SpringMVC RocketMq ``` SpringMVC RocketMq 注释拉满 boot版就不用写了,一个道理 补充下逻辑: A端消费,B端扣费 A端消费时,即进行数据库操作A_H,产生本地事物a, 在a事物内调用rocketmq的事物消息sendMessageInTransaction, sendMessageInTransaction回调TransactionListener事务监听器的executeLocalTransaction方法,这样 rocketmq就成功的加入到事物a中去,executeLocalTransaction执行数据库io操作(R_H)是在事物a中进行的,R_H和A_H荣辱与共, 再通过checkLocalTransaction方法检查a事物,这样就控制了消息应不应该被B接收执行。 ``` [![Fork me on Gitee](https://gitee.com/iispeily/rocketMq-demo/widgets/widget_3.svg?color=00054a)](https://gitee.com/iispeily/rocketMq-demo) ## 以下是官网描述 #### 事物消息 >什么是事物消息? 可以将其视为两阶段提交消息实现,以确保分布式系统中的最终一致性。事务性消息确保可以原子方式执行本地事务的执行和消息的发送。 >使用限制 * (1)事物消息没有时间表和批量支持。 * (2)为了避免多次检查单个消息并导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但用户可以通过更改“transactionCheckMax”来更改此限制“代理配置中的参数,如果已经通过”transactionCheckMax“检查了一条消息,则代理将默认丢弃此消息并同时打印错误日志。用户可以通过覆盖“AbstractTransactionCheckListener”类来更改此行为。 * (3)在经纪人的配置中由参数“transactionTimeout”确定的一段时间之后将检查事物消息。用户也可以通过在发送事务消息时设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来更改此限制,此参数优先于“transactionMsgTimeout”参数。 * (4)可以多次检查或消费事物消息。 * (5)对用户的目标主题的已提交消息可能会失败。目前,它取决于日志记录。RocketMQ本身的高可用性机制确保了高可用性。如果要确保事务性消息不会丢失且事务完整性得到保证,建议使用同步双写。机制。 * (6)事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许后向查询。MQ Server按其生产者ID查询客户端。 > 应用 1,事物状态 事务性消息有三种状态: * (1)TransactionStatus.CommitTransaction:提交事务,这意味着允许消费者使用此消息。 * (2)TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除而不允许使用。 * (3)TransactionStatus.Unknown:中间状态,表示需要MQ检查以确定状态。 2,发送事物消息 (1)创建事务生成器 使用TransactionMQProducer类创建生成器客户端,并指定唯一的producerGroup,并且可以设置自定义线程池来处理检查请求。执行本地事务后,需要根据执行结果回复MQ,并在上一节中描述回复状态。 ```$xslt import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } } ``` (2)实现TransactionListener接口 “executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三种事务状态之一。 “checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三种事务状态之一。 ```$xslt import ... public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } } ```