# MQ **Repository Path**: GUOSD/mq ## Basic Information - **Project Name**: MQ - **Description**: No description available - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-06-22 - **Last Updated**: 2022-02-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # MQ #### ActiveMQ API 发送和接受 MQ 的高可用性 MQ 的集群容错配置 MQ 的持久化 延时发送/定时投递 签收机制 Spring 整合 。。。。。。 // MQ 都需要满足的技术 #### 软件架构 为什么要使用 MQ ? 解决了耦合调用、异步模型、抵御洪峰流量,保护了主业务,消峰。 解耦、削峰、异步 安装mq 直接进入myactivemq 的 文件下的activemq 下的 bin 目录,使用 ./activemq start/stop 命令启动 检查activemq 是否启动的三种方法: 也是三种查看后台进程的方法 ps -ef|grep activemq|grep -v grep // grep -v grep 可以不让显示grep 本来的信息 netstat -anp|grep 61616 // activemq 的默认后台端口是61616 lsof -i:61616 前端查看http://localhost:8161/admin/ 账号/密码:admin/admin #### 安装教程 JMS : Java 消息中间件的服务接口规范,activemq 之上是 mq , 而 mq 之上是JMS 定义的消息规范 。 activemq 是mq 技术的一种理论实现(与之相类似的实现还有 Kafka RabbitMQ RockitMQ ),而 JMS 是更上一级的规范。 ![img.png](img.png) 在点对点的消息传递时,目的地称为 队列 queue 在发布订阅消息传递中,目的地称为 主题 topic JMS 开发基本步骤 1、创建连接工厂 2、通过连接工厂创建jms连接 connection 3、启动 jms连接 connection 4、通过connection创建jms session 5、 创建jms destination 6、创建jms producer或者创建jms message 并设置 destination 7、创建jms consumer或者注册jms message listener 8、发送或者接受jms message 9、关闭jms所有资源 (1)topic介绍 在发布订阅消息传递域中,目的地被称为主题(topic) 发布/订阅消息传递域的特点如下: (1)生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系; (2)生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。 (3)生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。 默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅 什么是Java消息服务? Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等, 用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。 JMS组成: JMS provider JMS producer JMS consumer JMS message: 消息头、消息属性、消息体 JMS的消息头有哪些属性: JMSDestination:消息目的地 JMSDeliveryMode:消息持久化模式 JMSExpiration:消息过期时间 JMSPriority:消息的优先级 JMSMessageID:消息的唯一标识符。后面我们会介绍如何解决幂等性。 说明: 消息的生产者可以set这些属性,消息的消费者可以get这些属性。 这些属性在send方法里面也可以设置。 JMS的消息体有哪些属性: TextMessage:普通字符串,包含一个String MapMessage: 一个map类型消息,key为string,值为Java基本类型 BytesMessage:二进制数组,包含一个byte[] StreamMessage:Java数据流消息,用标准流操作来顺序的填充和读取 ObjectMessage:对象消息,包含一个可序列化的Java对象 5.4消息属性 如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。 他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。 5.5消息的持久化 什么是持久化消息? 保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。 我的理解:在消息生产者将消息成功发送给MQ消息中间件之后。无论是出现任何问题,如:MQ服务器宕机、消费者掉线等。都保证(topic要之前注册过,queue不用)消息消费者,能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。 5.5.1queue消息非持久和持久 queue非持久,当服务器宕机,消息不存在(消息丢失了)。即便是非持久,消费者在不在线的话,消息也不会丢失,等待消费者在线,还是能够收到消息的。 queue持久化,当服务器宕机,消息依然存在。queue消息默认是持久化的。 持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。 可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。 5.5.2topic消息持久化 topic默认就是非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。 topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。 注意: 1.一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。 2.然后再运行生产者发送消息。 3.之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来。 5.6消息的事务性 (1)生产者开启事务后,执行commit方法,这批消息才真正的被提交。不执行commit方法,这批消息不会提交。执行rollback方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。 (2)消费者开启事务后,执行commit方法,这批消息才算真正的被消费。不执行commit方法,这些消息不会标记已消费,下次还会被消费。执行rollback方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。 (3)问:消费者和生产者需要同时操作事务才行吗? 答:消费者和生产者的事务,完全没有关联,各自是各自的事务。 5.7消息的签收机制 一、签收的几种方式 ① 自动签收(Session.AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。 ② 手动签收(Session.CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。 ③ 允许重复消息(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。 ④ 事务下的签收(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。 二、事务和签收的关系 ① 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。 ② 非事务性会话中,消息何时被确认取决于创建会话时的应答模式。 ③ 生产者事务开启,只有commit后才能将全部消息变为已消费。 ④ 事务偏向生产者,签收偏向消费者。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。 5.8 JMS的点对点总结 点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。 如果在Session关闭时有部分消息己被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收 队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势 5.9 JMS的发布订阅总结 (1)JMS的发布订阅总结 JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic。 主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。 主题使得消息订阅者和消息发布者保持互相独立不需要解除即可保证消息的传送 (2)非持久订阅 非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。 如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。 一句话:先订阅注册才能接受到发布,只给订阅者发布消息。 (3)持久订阅 客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息 当持久订阅状态下,不能恢复或重新派送一个未签收的消息。 持久订阅才能恢复或重新派送一个未签收的消息。 (4)非持久和持久化订阅如何选择 当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。 6. ActiveMQ的broker (1) broker是什么 相当于一个ActiveMQ服务器实例。说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。 (2) 启动broker时指定配置文件 启动broker时指定配置文件,可以帮助我们在一台服务器上启动多个broker。实际工作中一般一台服务器只启动一个broker。 ./activemq start xbean:file:/Users/guoshoudu/Documents/app/soft/apache-activemq-5.16.2/conf/activemq02.xml 9. ActiveMQ的传输协议 9.1 简介 ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。 activemq传输协议的官方文档:http://activemq.apache.org/configuring-version-5-transports.html 在上文给出的配置信息中,URI描述信息的头部都是采用协议名称:例如 描述amqp协议的监听端口时,采用的URI描述格式为“amqp://······”; 描述Stomp协议的监听端口时,采用URI描述格式为“stomp://······”; 唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire 9.2 支持的传输协议 个人说明:除了tcp和nio协议,其他的了解就行。各种协议有各自擅长该协议的中间件,工作中一般不会使用activemq去实现这些协议。如: mqtt是物联网专用协议,采用的中间件一般是mosquito。ws是websocket的协议,是和前端对接常用的,一般在java代码中内嵌一个基站(中间件)。stomp好像是邮箱使用的协议的,各大邮箱公司都有基站(中间件)。 注意:协议不同,我们的代码都会不同。 9.2.1 TCP协议 (1)Transmission Control Protocol(TCP)是默认的。TCP的Client监听端口61616 (2)在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。 (3)TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。 (4)TCP传输的的优点: TCP协议传输可靠性高,稳定性强 高效率:字节流方式传递,效率很高 有效性、可用性:应用广泛,支持任何平台 (5)关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/tcp-transport-reference 9.2.2 NIO协议 (1)New I/O API Protocol(NIO) (2)NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。 (3)适合使用NIO协议的场景: 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。 (4)NIO连接的URI形式:nio://hostname:port?key=value&key=value (5)关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/configuring-version-5-transports.html 9.2.3 AMQP协议 9.2.4 STOMP协议 9.2.5 MQTT协议 10.ActiveMQ的消息存储和持久化 10.1 介绍 (1)此处持久化和之前的持久化的区别 MQ高可用:1事务、2可持久、3签收,是属于MQ自身特性,自带的。 这里的 4持久化是外力,是外部插件。之前讲的持久化是MQ的外在表现,现在讲的的持久是是底层实现。 (2)是什么: 官网文档:http://activemq.apache.org/persistence 持久化是什么?一句话就是:ActiveMQ宕机了,消息不会丢失的机制。 说明:为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。 ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。就是在发送者将消息发送出去后, 消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。 消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。 10.2 有哪些 (1)AMQ Message Store 基于文件的存储机制,是以前的默认机制,现在不再使用。 AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除, 在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本 (2)kahaDB 现在默认的。下面我们再详细介绍。 KahaDB 是一个基于文件的持久性数据库,它位于使用它的消息代理本地。它已针对快速持久性进行了优化。它是自ActiveMQ 5.4以来的默认存储机制。KahaDB 使用更少的文件描述符并提供比其前身AMQ 消息存储更快的恢复。 配置 要将 KahaDB 用作代理的持久适配器,请按如下方式配置 ActiveMQ(示例): 日志文件的存储目录在:%activemq安装目录%/data/kahadb (3)JDBC消息存储 下面我们再详细介绍。 (4)LevelDB消息存储 过于新兴的技术,现在有些不确定。 p52 #### 使用说明 1. xxxx 2. xxxx 3. xxxx #### 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request #### 特技 1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md 2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) 3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) 6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)