# rabbitmq-on-kuber **Repository Path**: xwma/rabbitmq-on-kubernetes ## Basic Information - **Project Name**: rabbitmq-on-kuber - **Description**: 在k8s上部署可持久化的rabbitmq集群 - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 12 - **Created**: 2020-11-10 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README #rabbitmq在kubernetes中持久化集群部署 # **背景** 某电商系统的消息总线使用的事rabbitmq,在订单创建、静态页生成、索引生成等等业务中大量采用异步消息系统,这个对于mq高可用的要求有两个重要的考量: 1、集群化 2、可扩容 3、冗灾 冗灾就要实现rabbitmq的持久化,要考虑到rabbitmq宕机的情况,当rabbitmq因不可抗因素挂掉了,这时有一些消息还没来得及被消费,当我们再恢复了rabbitmq的运行后,这些消息应该同时被恢复,可以再次被消费。 ## **部署过程** ### **一、pv(持久卷的建立)** 先要建立nfs服务器 对于持久卷的结构规划如下: ``` /nfs/data/mqdata ``` 根据如上规划建立nfs服务: ``` #master节点安装nfs yum -y install nfs-utils #创建nfs目录 mkdir -p /nfs/data/{mqdata,esmaster,esdata} #修改权限 chmod -R 777 /nfs/data/ #编辑export文件 vim /etc/exports 粘贴如下内容: /nfs/data/mqdata *(rw,no_root_squash,sync) #配置生效 exportfs -r #查看生效 exportfs #启动rpcbind、nfs服务 systemctl restart rpcbind && systemctl enable rpcbind systemctl restart nfs && systemctl enable nfs #查看 RPC 服务的注册状况 rpcinfo -p localhost #showmount测试,这里的ip输入master节点的局域网ip showmount -e ``` 如果成功可以看到可被挂载的目录: ``` # showmount -e 172.17.14.73 Export list for 172.17.14.73: /nfs/data/mqdata * ``` 接下来,要在每一个节点上安装nfs服务以便使k8s可以挂载nfs目录 ``` #所有node节点安装客户端 yum -y install nfs-utils systemctl start nfs && systemctl enable nfs ``` 这样就为k8s的持久卷做好了准备。 ## **建立持久卷** 有了nfs的准备,我就可以建立持久卷了: 在您的k8s maseter节点服务器上 clone我们准备好的yaml文件 ``` https://gitee.com/enation/rabbitmq-on-kubernetes.git ``` 修改根目录中的pv.yaml 修改其中的server配置为nfs服务器的IP: ``` nfs: server: 192.168.1.100 #这里请写nfs服务器的ip ``` 通过下面的命令建立持久卷: ``` kubectl create -f pv.yaml ``` 通过以下命令查看持久卷是否建立成功: ``` kubectl get pv ``` ## **部署rabbitmq** 在k8s master节点上执行下面的命令创建namespace: ``` kubectl create namespace ns-rabbitmq ``` 执行下面的命令创建rabbitmq集群(执行整个目录的所有配置文件) ``` kubectl create -f rabbitmq/ ``` 通过以上部署我们建立了一个ns-rabbitmq的namespace,并在其中创建了相应的pvc、角色账号,有状态副本集以及服务。 ### **镜像** 使用的是javashop自己基于rabbitmq:3.8做的,加入了延迟消息插件,其他没有变化。 ### **服务** 我们默认开启了对外nodeport端口,对应关系: 31672->15672 30672->5672 k8s内部可以通过下面的服务名称访问: rabbitmq.ns-rabbitmq:15672 rabbitmq.ns-rabbitmq:5672 等待容器都启动成功后验证。 ## **验证** 使用附带程序校验 1. 发送消息(注释掉接收消息) 2. 观察mq的队列中有消息堆积 3. 删除mq的副本集 4. 恢复mq副本集 5. 接收消息 ## **关键技术点** 1、集群发现: 使用rabbitmq提供的k8s对等发现插件:rabbitmq_peer_discovery_k8s 2、映射持久卷 映射到:/var/lib/rabbitmq/mnesia 3、自定义数据目 ``` - name: RABBITMQ_MNESIA_BASE value: /var/lib/rabbitmq/mnesia/$(MY_POD_NAME) ``` 其中MY_POD_NAME是读取的容器名称,通过有状态副本集保证唯一性的绑定: ``` - name: MY_POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace ``` ## **附带验证程序** ``` private static CachingConnectionFactory connectionFactory; private static void initConnectionFactory() { connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); } public static void main(String[] args) { initConnectionFactory(); //发送消息 send(); //接收消息 receive(); } private static void receive() { AmqpTemplate template = new RabbitTemplate(connectionFactory); String foo = (String) template.receiveAndConvert("myqueue"); System.out.println("get message : "+ foo); } private static void send() { AmqpAdmin admin = new RabbitAdmin(connectionFactory); admin.declareQueue(new Queue("myqueue",true)); AmqpTemplate template = new RabbitTemplate(connectionFactory); template.convertAndSend("myqueue", "foo"); } ```