# activeMQ **Repository Path**: java_wangyin/active-mq ## Basic Information - **Project Name**: activeMQ - **Description**: No description available - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2021-03-31 - **Last Updated**: 2021-05-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # ActiveMQ ## 安装 + Kafka RabbitMQ RocketMQ ActiveMQ ![MQ](./images/MQ.png) + 官网 : https://activemq.apache.org/ + MQ作用 + 解耦 + 削峰 + 异步 + 语言:Java + 下载 + https://activemq.apache.org/components/classic/download/ + 解压 ~~~shell tar -zxvf apache-activemq-5.16.1-bin.tar.gz ~~~ + 启动 ~~~shell [root@localhost bin]# ./activemq start INFO: Loading '/opt/apache-activemq-5.16.1//bin/env' INFO: Using java '/usr/local/tools/jdk/jdk1.8.0_171/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/opt/apache-activemq-5.16.1//data/activemq.pid' (pid '30139') [root@localhost bin]# jps -l 19957 org.apache.zookeeper.server.quorum.QuorumPeerMain 18215 /usr/local/tools/nacos/target/nacos-server.jar 30457 sun.tools.jps.Jps 30412 /opt/apache-activemq-5.16.1//bin/activemq.jar ~~~ 带日志输出: ~~~shell [root@localhost apache-activemq-5.16.1]# ./bin/activemq start > ./mqrun.log ~~~ + 关闭 ~~~shell [root@localhost bin]# ./activemq stop INFO: Loading '/opt/apache-activemq-5.16.1//bin/env' INFO: Using java '/usr/local/tools/jdk/jdk1.8.0_171/bin/java' INFO: Waiting at least 30 seconds for regular process termination of pid '30412' : Java Runtime: Oracle Corporation 1.8.0_171 /usr/local/tools/jdk/jdk1.8.0_171/jre Heap sizes: current=62976k free=61992k max=932352k JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/opt/apache-activemq-5.16.1//conf/login.config -Dactivemq.classpath=/opt/apache-activemq-5.16.1//conf:/opt/apache-activemq-5.16.1//../lib/: -Dactivemq.home=/opt/apache-activemq-5.16.1/ -Dactivemq.base=/opt/apache-activemq-5.16.1/ -Dactivemq.conf=/opt/apache-activemq-5.16.1//conf -Dactivemq.data=/opt/apache-activemq-5.16.1//data Extensions classpath: [/opt/apache-activemq-5.16.1/lib,/opt/apache-activemq-5.16.1/lib/camel,/opt/apache-activemq-5.16.1/lib/optional,/opt/apache-activemq-5.16.1/lib/web,/opt/apache-activemq-5.16.1/lib/extra] ACTIVEMQ_HOME: /opt/apache-activemq-5.16.1 ACTIVEMQ_BASE: /opt/apache-activemq-5.16.1 ACTIVEMQ_CONF: /opt/apache-activemq-5.16.1/conf ACTIVEMQ_DATA: /opt/apache-activemq-5.16.1/data Connecting to pid: 30412 .Stopping broker: localhost . TERMINATED ~~~ + 访问 + /opt/apache-activemq-5.16.1/conf/jetty.xml ~~~properties ~~~ + http://10.10.10.201:8161/admin + admin/admin ## 编码 ### JMS架构 ![JMS架构图](./images/JMS架构图.png) ### Queue Topic ![queue_topic_model](./images/queue_topic_model.png) ### JMS 组成 + JMS provider + 实现JMS接口和规范的消息中间件,也就是MQ服务器 + JMS producer + 消息生产者,创建消息客户端 + JMS consumer + 消息消费者,接受处理消息的客户端 + JMS message + 消息头 + JMSDestination + 目的地:Queue 或 Topic ~~~java textMessage.setJMSDestination(queue); ~~~ + JMSDeliveryMode + 是否持久化(默认持久化) ~~~java //非持久 textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); //持久化 textMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT); ~~~ + JMSExpiration + 消息过期时间(默认永久不过期)只要没有被消费就不会过期! ~~~java textMessage.setJMSExpiration(5000l); ~~~ + JMSPriority + 默认4级 + 优先级:0-9个优先级,0-4是普通消息,5-9是加急消息! + MQ必须保证加急消息要先于普通消息送达! ~~~java textMessage.setJMSPriority(5); ~~~ + JMSMessageID + 消息ID:消息的唯一标识<消息调用幂等性> ~~~java textMessage.setJMSMessageID("message_id_191"); ~~~ + 消息体:发送和接受的消息类型必须一致 + TextMessage + 普通字符串消息,包含一个String ~~~java session.createTextMessage(); ~~~ + MapMessage + 一个Map类型的消息,key为String,value是Java基本类型 ~~~java session.createMapMessage(); ~~~ + BytesMessage + 二进制数组消息,包含一个byte[] ~~~java session.createBytesMessage(); ~~~ + StreamMessage + Java数据流消息,用标准流操作来顺序的填充和读取 ~~~java session.createStreamMessage(); ~~~ + ObjectMessage + 对象消息,包含一个可序列化的Java对象 ~~~java session.createObjectMessage(); ~~~ + 消息属性 + 识别/去重/重点标注等操作使用 ~~~java textMessage.setBooleanProperty(); ~~~ ### 可靠性 #### 持久性 + 持久化:当服务器宕机,消息依然存在 ~~~java //持久化 message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); ~~~ + 非持久化:当服务器宕机,消息不存在 ~~~java //非持久 message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); ~~~ + Topic 的持久性 + 消费者:cn.mrwangyin.activemq.jms.topic.JmsConsumerListenerrPersist#main + 生产者:cn.mrwangyin.activemq.jms.topic.JmsProducePersist#main + 消费者先订阅Topic,离线的消息,恢复后同样能接受到 ![subscribers](./images/subscribers.png) #### 事务 + cn.mrwangyin.activemq.jms.topic.JmsProduceTX#main + 生产者 ~~~java //开启事务 transacted=true,最后不提交不会发送到MQ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //开启事务需要手动提交 if (next.contains("commit")) { session.commit(); System.out.println("Topic ==> session.commit...."); System.out.println("Topic ==> 消息发送成功:" + "msg----:" + next); } ~~~ + cn.mrwangyin.activemq.jms.topic.JmsConsumerLinstenerTX#main + 消费者 ~~~java //消费者开启事务,需要提交session.commit() 才能真正消费消息 final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); ~~~ ```java consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message != null && message instanceof TextMessage) { try { TextMessage textMessage = (TextMessage) message; System.out.println("MessageListener 获取到消息:" + textMessage.getText()); //提交消费消息,如果不提交,MQ则认为消息还未被消费,第二次还会被消费 session.commit(); System.out.println("消息消费已提交...."); } catch (JMSException e) { e.printStackTrace(); } } } }); ``` #### 消费者签收 + 如果开启了事务,签不签收都无所谓,以事务提交为准!!! + cn.mrwangyin.activemq.jms.queue.JmsProduceTX#main + cn.mrwangyin.activemq.jms.queue.JmsConsumerLinstenerACK#main + 自动签收 AUTO_ACKNOWLEDGE(常用) ~~~java //第一个参数叫事务,第二个参数叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ~~~ + 手动签收 CLIENT_ACKNOWLEDGE(常用) + 如果不手动签收,就会造成消息重复消费 ~~~java //第一个参数叫事务,第二个参数叫签收 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); TextMessage textMessage = (TextMessage) message; System.out.println("MessageListener TextMessage 获取到消息:" + textMessage.getText()); //手动签收 textMessage.acknowledge() ~~~ + 允许重复消息 DUPS_OK_ACKNOWLEDGE ~~~java //第一个参数叫事务,第二个参数叫签收 Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); ~~~ + 以事务为主 SESSION_TRANSACTED :结合开启事务使用 ~~~java //第一个参数叫事务,第二个参数叫签收 Session session = connection.createSession(true, Session.SESSION_TRANSACTED); ~~~ ### Queue 编码 + cn.mrwangyin.activemq.jms.queue.JmsProduce#main + cn.mrwangyin.activemq.jms.queue.JmsConsumerLinstener#main + cn.mrwangyin.activemq.jms.queue.JmsConsumer#main ![jsm开发步骤](./images/jsm开发步骤.png) #### Topic 编码 + cn.mrwangyin.activemq.jms.topic.JmsProduce#main + cn.mrwangyin.activemq.jms.topic.JmsConsumerLinstener #### Queue Topic 比较 ![queue_topic](./images/queue_topic.png) #### ActiveMQ Broker + 每个broker是一个activeMQ实例,指定配置文件启动实例 ~~~shell ./bin/activemq start xbean:file:./conf/activemq.xml ~~~ + 嵌入式broker + cn.mrwangyin.activemq.jms.broker.EmbedBroker + cn.mrwangyin.activemq.jms.broker.JmsProduceBroker + cn.mrwangyin.activemq.jms.broker.JmsConsumerBroker ~~~java /** * ActiveMQ 支持在VM中通信基于嵌入式的broker * 嵌入式ActiveMQ * * @param args */ public static void main(String[] args) { try { BrokerService brokerService = new BrokerService(); brokerService.setUseJmx(true); brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } catch (Exception e) { e.printStackTrace(); } } ~~~ #### 整合Spring + 消费者 + cn.mrwangyin.activemq.SpringMQConsumer + 生产者 + cn.mrwangyin.activemq.SpringMQProduce #### 整合SpringBoot + 依赖 ~~~xml org.springframework.boot spring-boot-starter-activemq org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test org.springframework.boot spring-boot-starter-actuator ~~~ 配置: ~~~yaml server: port: 7777 spring: activemq: # ActiveMQ 地址 broker-url: tcp://10.10.10.202:61616 # 账号密码 user: admin password: admin jms: # false 就是 队列,true 就是 topic (默认false 队列) pub-sub-domain: false # 定义自己的队列名称 myqueue: boot-activemq-queue ~~~ 开启JMS: ~~~java @Configuration //开启JMS @EnableJms public class ConfigBean { @Value("${myqueue}") private String myQueue; @Bean public Queue queue() { return new ActiveMQQueue(myQueue); } } ~~~ 发送消息: ~~~java @Component public class Queue_Produce { @Resource private JmsMessagingTemplate messagingTemplate; @Resource private Queue queue; /** * 生产消息 * * @param message */ public void produce(String message) { messagingTemplate.convertAndSend(queue, message); } } ~~~ ### 传输协议 + 默认传输协议:openwire(tcp) + https://activemq.apache.org/configuring-version-5-transports.html + Transmission Control Proticol (TCP) + https://activemq.apache.org/tcp-transport-reference + Broker默认配置,TCP的Client监听61616 + 在网络传输数据前,必须要序列化数据,消息时通过一个叫wire protocol的来序列化成字节流。在默认情况下ActiveMQ将wire protocol叫做OpenWire,它的目的时促使网络上的效率和数据快速交互。 + TCP连接的URL形式如:tcp://hostname:port?k=v + TCP优点 + TCP协议传输可靠性高,稳定性强 + 高效性:字节流方式传递,效率高 + 有效性、可用性 + NEW I/O Protocol (NIO) + NIO协议和TCP协议类似但NIO更侧重于底层的访问操作,它允许开发人员对同一资源可有更多的client调用和服务端有更多的负载。 + 适合使用NIO场景 + 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker时被操作系统的线程所限制的(Linux 普通用户线程max1000,root用户无限制)。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议 + NIO比TCP能提供更好的性能 + NIO连接URL:nio://hostname:port?k=v ~~~text The NIO Transport Same as the TCP transport, except that the New I/O (NIO) package is used, which may provide better performance. The Java NIO package should not be confused with IBM’s AIO4J package. To switch from TCP to NIO, simply change the scheme portion of the URI. Here’s an example as defined within a broker’s XML configuration file. ... ... Trying to use nio transport url on the client side will instantiate the regular TCP transport. For more information see the NIO Transport Reference ~~~ 修改传输协议: ~~~shell vim apache-activemq-5.16.1/conf/activemq.xml ~~~ ~~~xml ~~~ ~~~xml ~~~ + 重启ActiveMQ ![nio](./images/nio.png) + NIO - Client 测试 + cn.mrwangyin.activemq.jms.queue.JmsProduce#main + cn.mrwangyin.activemq.jms.queue.JmsConsumer ~~~java private static final String ACTIVEMQ_URL = "nio://10.10.10.202:61608"; private static final String QUEUE_NAME = "Transport-NIO+AUTO"; ~~~ + NIO进一步优化 + URI格式以nio开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。但是这样的设置方式,只能使这个端口支持Openwire协议。 + https://activemq.apache.org/auto ~~~text Enabling AUTO over NIO To configure ActiveMQ auto wire format detection over an NIO TCP connection use the auto+nio transport prefix. For example, add the following transport configuration in your XML file: ~~~ + 优化配置 ~~~xml ~~~ + 连接 + cn/mrwangyin/activemq/jms/queue/JmsConsumer + cn.mrwangyin.activemq.jms.queue.JmsProduce ![auto-nio](./images/auto-nio.png) ### ActiveMQ的消息存储和持久化 + https://activemq.apache.org/persistence #### [Replicated LevelDB Store](https://activemq.apache.org/replicated-leveldb-store) + https://activemq.apache.org/replicated-leveldb-store + 带复制功能的LevelDB #### [LevelDB Store](https://activemq.apache.org/leveldb-store) + https://activemq.apache.org/replicated-leveldb-store + ActiveMQ5.8之后引入,和KahaDB相似,也是基于文件的本地数据库存储形式。它提供比KahaDB更快的持久性。它不是基于B-Tree 索引,而是基于LevelDB的索引。 + conf/activemq.xml ~~~xml ~~~ #### [KahaDB](https://activemq.apache.org/kahadb) + ActiveMQ5.4以后默认 + https://activemq.apache.org/kahadb + 基于日志文件 + conf/activemq.xml ~~~xml ~~~ + 存储位置 ~~~shell [root@localhost kahadb]# pwd /opt/apache-activemq-5.16.1/data/kahadb [root@localhost kahadb]# ll -h 总用量 2.0M -rw-r--r--. 1 root root 32M 4月 1 23:22 db-1.log -rw-r--r--. 1 root root 200K 4月 1 23:42 db.data -rw-r--r--. 1 root root 189K 4月 1 23:42 db.redo -rw-r--r--. 1 root root 8 4月 1 23:05 lock ~~~ + 存储原理 + 事务日志+索引文件 + db-.log : 数据记录文件 + db.data : 该文件包含了持久化的BTree索引,索引指向消息记录文件db-.log中的消息。 + db.free : 当前db.data文件里那些页面是空闲的,文件具体内容是所有空闲页的ID + db.redo : 用来进行消息恢复,如果KahaDB消息存储被强制退出后再启动,用户恢复BTree索引 + lock : 文件锁,表示当前获得kahadb读写权限的broker #### JDBC - MYSQL + 效率相对较低 + 安装mysql驱动包 ~~~shell [root@localhost opt]# cp mysql-connector-java-8.0.16.jar apache-activemq-5.16.1/lib/ ~~~ + conf/activemq.xml : jdbcPersistenceAdapter配置 ~~~xml ~~~ + #mysql-ds 引用 mysql-ds + createTablesOnStartup是否在启动的时候创建数据库表,默认true,这样每次启动都会创建数据库表,**一般第一次启动设置true,之后改为false** + 配置数据源:mysql-ds + ACTIVEMQ持久化配置MYSQL8.0各种问题解决办法: https://www.freesion.com/article/8687906277/ ~~~xml ~~~ + 如果需要配置其他的连接池,那么还需要将对于的连接池jar包cp到lib文件夹下。 + **注意这个配置一定要放对位置**: ![mysql-ds-conf](./images/mysql-ds-conf.png) + 建库建表 + 建库 ~~~sql CREATE DATABASE `activemq` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci'; ~~~ + 重启activemq自动建表 + 数据库表 + ACTIVEMQ_MSGS ~~~sql CREATE TABLE `ACTIVEMQ_MSGS` ( `ID` bigint(20) NOT NULL, `CONTAINER` varchar(250) NOT NULL, `MSGID_PROD` varchar(250) DEFAULT NULL, `MSGID_SEQ` bigint(20) DEFAULT NULL, `EXPIRATION` bigint(20) DEFAULT NULL, `MSG` blob, `PRIORITY` bigint(20) DEFAULT NULL, `XID` varchar(250) DEFAULT NULL, PRIMARY KEY (`ID`), KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`), KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`), KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`), KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`), KEY `ACTIVEMQ_MSGS_XIDX` (`XID`), KEY `ACTIVEMQ_MSGS_IIDX` (`ID`,`XID`,`CONTAINER`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ~~~ + CONTAINER:消息Destination + MSGID_PROD:消息发送者的主键 + MSG_SEQ:发送消息的顺序MSGID_PROD+MSG_SEQ=MessageID + EXPIRATION:消息的过期时间 + MSG:消息本体Java序列化对象的二进制数据 + PRIORITY:优先级(0-9) + ACTIVEMQ_ACKS ~~~sql CREATE TABLE `ACTIVEMQ_ACKS` ( `CONTAINER` varchar(250) NOT NULL, `SUB_DEST` varchar(250) DEFAULT NULL, `CLIENT_ID` varchar(250) NOT NULL, `SUB_NAME` varchar(250) NOT NULL, `SELECTOR` varchar(250) DEFAULT NULL, `LAST_ACKED_ID` bigint(20) DEFAULT NULL, `PRIORITY` bigint(20) NOT NULL DEFAULT '5', `XID` varchar(250) DEFAULT NULL, PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`), KEY `ACTIVEMQ_ACKS_XIDX` (`XID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ~~~ + 主要用于存储订阅关系 + CONTAINER : 消息Destination + SUB_DEST : 如果是使用Static集群,这个字段会有集群其他系统的信息 + CLIENT_ID : 每个订阅者都必须有一个唯一的客户端ID用以区分 + SUB_NAME:订阅者名称 + SELECTOR :选择器,可以选择只消费满足条件的消息。条件可以使用自定义属性实现,可支持AND/OR操作 + LAST_ACKED_ID : 记录消费过的消息ID + PRIORITY : 优先级 + ACTIVEMQ_LOCK ~~~sql CREATE TABLE `ACTIVEMQ_LOCK` ( `ID` bigint(20) NOT NULL, `TIME` bigint(20) DEFAULT NULL, `BROKER_NAME` varchar(250) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ~~~ + 在集群环境中才有用,只有一个Broler可以获得消息,称为Master Broker,其他的Broker之恶能作为备份,等待Master Broker不可用,才可能成为下一个Broker. + 这个表用于记录那个Broker是Master Broker + 代码测试 + 生产者一定要**开启持久化**-才能存到数据库 ~~~java // Queue 消息生产者 MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); //Topic 类型的消息,只有订阅型才能持久化到MYSQL,普通Topic是错过了就不会再接收到 ~~~ + **当消息被消费 ACTIVEMQ_MSGS 表中数据会被清除** ![mysql_activemq_ACTIVEMQ_MSGS](./images/mysql_activemq_ACTIVEMQ_MSGS.png) + 订阅Topic + **消费后消息不会被删除** + cn.mrwangyin.activemq.jms.topic.JmsProducePersist ~~~java // 消息生产者 MessageProducer producer = session.createProducer(topic); //持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); ~~~ ![mysql_activemq_ACTIVEMQ_ACKS](./images/mysql_activemq_ACTIVEMQ_ACKS.png) ![mysql_activemq_ACTIVEMQ_MSGS_SUB_TOPIC](./images/mysql_activemq_ACTIVEMQ_MSGS_SUB_TOPIC.png) + ActiveMQ Journal 高速缓存 + 解决JDBC性能问题 + 比如生产者生产了1000条消息,这1000条消息会先保存到journal文件中,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者就已经消费了90%以上的数据,那么这个时候只需要同步10%的消息到DB,如果消费得很慢,journal文件可以使消息以批量方式写道DB; + 配置方式: ~~~xml ~~~ + 重启ActiveMQ ~~~shell [root@localhost journal]# pwd /opt/apache-activemq-5.16.1/activemq-data/journal [root@localhost journal]# ll -h 总用量 8.0K -rw-r--r--. 1 root root 140 4月 2 13:58 control.dat -rw-r--r--. 1 root root 32K 4月 2 13:58 log-000.dat -rw-r--r--. 1 root root 32K 4月 2 13:57 log-001.dat -rw-r--r--. 1 root root 32K 4月 2 13:57 log-002.dat -rw-r--r--. 1 root root 32K 4月 2 13:57 log-003.dat ~~~ #### 高可用之 LevelDB + Zookeeper 集群(官方弃用) ~~~wiki ActiveMQ V5.9 In ActiveMQ 5.9, the Replicated LevelDB Store is introduced. It handles using Apache ZooKeeper to pick a master from a set of broker nodes configured to replicate single LevelDB Store. Then synchronizes all slave LevelDB Stores with the master keeps them up to date by replicating all updates to the master. This might become the preferred Master Slave configuration going forward. ActiveMQ 5.9版 在activemq5.9中,引入了复制的LevelDB存储。它使用apachezookeeper从一组配置为复制单个LevelDB存储的代理节点中选择主节点。然后将所有从属LevelDB存储与主存储同步,通过将所有更新复制到主存储来保持它们的最新状态。这可能成为今后首选的主从配置。 ~~~ + https://activemq.apache.org/replicated-leveldb-store ![replicated-leveldb-store](./images/replicated-leveldb-store.png) + Zookeeper集群 + 10.0.1.28:2181 + 10.0.1.29:2181 + 10.0.1.30:2181 + ActiveMQ集群 + 10.0.1.28:8161 + 10.0.1.29:8161 + 10.0.1.30:8161 ~~~shell # 将 10.0.1.28 上的ActiveMQ传到其他两台服务器 [root@localhost opt]# pwd /opt [root@localhost opt]# scp -r ./apache-activemq-5.16.1 root@10.0.1.29:/opt [root@localhost opt]# scp -r ./apache-activemq-5.16.1 root@10.0.1.30:/opt ~~~ + 对应修改配置 ~~~shell [root@localhost conf]# vim jetty.xml ~~~ ~~~xml ~~~ + 三台服务的ActiveMQ的brokerName必须一致 + brokerName="Active_MQ_Broker" ~~~shell [root@localhost conf]# vim activemq.xml ~~~ ~~~xml ~~~ + 三台服务器的持久化配置一致 + https://activemq.apache.org/replicated-leveldb-store ~~~wiki Configuration You can configure ActiveMQ to use LevelDB for its persistence adapter - like below : ... ... ~~~ + 10.0.1.29 ~~~xml ~~~ + 10.0.1.30 ~~~xml ~~~ + Zookeeper查看注册信息 ~~~shell [zk: localhost:2181(CONNECTED) 1] ls /activemq [leveldb-stores] [zk: localhost:2181(CONNECTED) 2] ls /activemq/leveldb-stores [00000000000, 00000000001, 00000000002] [zk: localhost:2181(CONNECTED) 9] get /activemq/leveldb-stores/00000000000 {"id":"Active_MQ_Broker","container":null,"address":"tcp://10.0.1.28:43088","position":-1,"weight":1,"elected":"0000000000"} [zk: localhost:2181(CONNECTED) 10] get /activemq/leveldb-stores/00000000001 {"id":"Active_MQ_Broker","container":null,"address":null,"position":-1,"weight":1,"elected":null} [zk: localhost:2181(CONNECTED) 11] get /activemq/leveldb-stores/00000000002 {"id":"Active_MQ_Broker","container":null,"address":null,"position":-1,"weight":1,"elected":null} ~~~ + 如果 elected 有值不是null,那么这台机器就是Master,其余两台是Slave + 10.0.1.28 : Master + 10.0.1.29 : Slave + 10.0.1.30 : Slave + 测试 + 8161端口只有一个节点能够访问,就是Master节点 + http://10.0.1.28:8161/admin/queues.jsp + ActiveMQ的客户端只能访问Master的Broker,其他处理Slave的Broker不能访问,所以客户端连接的Broker应该使用**failover协议**(故障转移) + 当一个ActiveMQ一个节点挂了或者一个Zookeeper节点挂了,ActiveMQ正常运行,但是如果ActiveMQ仅剩一台,那么就不能使用,因为不能选举Master,Zookeeper同理。 + 测试代码 + 生产者:cn.mrwangyin.activemq.jms.cluster.ClusterProduceQueue#main + 消费者:cn.mrwangyin.activemq.jms.cluster.ClusterConsumerQueue#main + 坑? + 解决ActiveMQ集群启动后,master-slave之间无法切换的问题 + https://www.tqwba.com/x_d/jishu/211262.html + 将activemq/data下的数据全部删除即可,是其下面的数据干扰了主从切换。 + ActiveMQ集成springboot项目集群高可用无效,启动失败 + https://blog.csdn.net/huangdi1309/article/details/108226333