# zbus **Repository Path**: rockjava/zbus ## Basic Information - **Project Name**: zbus - **Description**: 轻量级服务总线/消息队列,1)多种消息模式--支持生产者/消费者,发布订阅,RPC。2)丰富的API--C/C++/C#/JAVA/Python/Node.JS跨平台、多语言支持; 3)开放协议标准--原生兼容HTTP协议(长连接),头部动态扩展;4)支持TrackServer与ZbusServer高可用横向动态扩容机制。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: http://zbus.org - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 914 - **Created**: 2015-04-20 - **Last Updated**: 2020-12-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # ZBUS--轻量级消息队列、服务总线 ##**ZBUS** 特性 * **消息队列 -- 生产者消费者模式、发布订阅** * **服务总线 -- 适配改造已有业务系统,使之具备跨平台与语言, RPC** * **RPC -- 分布式远程方法调用,Java方法透明代理** * **跨平台、多语言** * **高可用、高并发** ##**ZBUS** SDK * [Java SDK](http://git.oschina.net/rushmore/zbus "zbus-java") * [C/C++ SDK](http://git.oschina.net/rushmore/zbus-c "zbus-c") * [Python SDK](http://git.oschina.net/rushmore/zbus-python "zbus-python") * [C# SDK](http://git.oschina.net/rushmore/zbus-csharp "zbus-csharp") * [Node.JS SDK](http://git.oschina.net/rushmore/zbus-nodejs "zbus-nodejs") ##**ZBUS** 桥接 * [微软MSMQ](http://git.oschina.net/rushmore/zbus-msmq "zbus-msmq") * [金证KCXP](http://git.oschina.net/rushmore/zbus-kcxp "zbus-kcxp") ## ZBUS 启动与监控 1. zbus-dist选择zbus.sh或者zbus.bat直接执行 2. 通过源码ZbusServer.java个性化控制启动 ![简单监控](http://git.oschina.net/uploads/images/2015/0212/103207_b5d2e1d3_7458.png) 总线默认占用**15555**端口, [http://localhost:15555](http://localhost:15555 "默认监控地址") 可以直接进入监控,注意zbus因为原生兼容HTTP协议所以监控与消息队列使用同一个端口 **高可用模式启动总线** 分别启动ZbusServer与TrackServer,无顺序之分,默认ZbusServer占用15555端口,TrackServer占用16666端口。 ## ZBUS 设计概要图 ![zbus-arch](http://git.oschina.net/uploads/images/2015/0419/134134_62a4e21c_7458.png) ## ZBUS 示例 ### Java Maven 依赖 org.zbus zbus 5.2.0 ### 生产者 //1)创建Broker代理【重量级对象,需要释放】 SingleBrokerConfig config = new SingleBrokerConfig(); config.setBrokerAddress("127.0.0.1:15555"); final Broker broker = new SingleBroker(config); //2) 创建生产者 【轻量级对象,不需要释放,随便使用】 Producer producer = new Producer(broker, "MyMQ"); producer.createMQ(); //如果已经确定存在,不需要创建 Message msg = new Message(); msg.setBody("hello world"); Message res = producer.sendSync(msg, 1000); System.out.println(res); //3)销毁Broker broker.close(); ### 消费者 //1)创建Broker代表 SingleBrokerConfig brokerConfig = new SingleBrokerConfig(); brokerConfig.setBrokerAddress("127.0.0.1:15555"); Broker broker = new SingleBroker(brokerConfig); MqConfig config = new MqConfig(); config.setBroker(broker); config.setMq("MyMQ"); //2) 创建消费者 @SuppressWarnings("resource") Consumer c = new Consumer(config); c.onMessage(new MessageCallback() { public void onMessage(Message msg, Session sess) throws IOException { System.out.println(msg); } }); ### RPC动态代理【各类复杂类型】 参考源码test目下的rpc部分 SingleBrokerConfig config = new SingleBrokerConfig(); config.setBrokerAddress("127.0.0.1:15555"); Broker broker = new SingleBroker(config); RpcConfig rpcConfig = new RpcConfig(); rpcConfig.setBroker(broker); rpcConfig.setMq("MyRpc"); //动态代理处Interface通过zbus调用的动态实现类 Interface hello = RpcProxy.getService (Interface.class, rpcConfig); Object[] res = hello.objectArray(); for (Object obj : res) { System.out.println(obj); } Object[] array = new Object[] { getUser("rushmore"), "hong", true, 1, String.class }; int saved = hello.saveObjectArray(array); System.out.println(saved); Class ret = hello.classTest(String.class); System.out.println(ret); ### Spring集成--服务端(RPC示例) **无任何代码侵入使得你已有的业务接口接入到zbus,获得跨平台和多语言支持** ### Spring集成--客户端 **Spring完成zbus代理透明化,zbus设施从你的应用逻辑中彻底消失** public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("ZbusSpringClient.xml"); Interface intf = (Interface) context.getBean("interface"); System.out.println(intf.listMap()); } # ZBUS消息协议 ## 协议概览 ZBUS协议继承于HTTP协议格式,主体采用HTTP头部协议扩展完成,HTTP协议由HTTP头部和HTTP包体组成, ZBUS协议在HTTP头部KV值做了扩展,支持浏览器方式直接访问,但是ZBUS链接机制采用保持长连接方式。 原则上, 编写客户端SDK只需要遵循下面ZBUS协议扩展即可。 ZBUS扩展头部主要是完成 * 消息命令 * ZBUS消息队列寻址 * 异步消息匹配 * 安全控制 扩展头部Key-Value解释 ###1. 消息命令 命令标识,决定Broker(ZbusServer|TrackServer)的处理 cmd: produce | consume | request | heartbeat | admin(默认值) ###2. 消息队列寻址 mq: 消息目标队列 mq-reply: 消息回复队列 ###3. 异步消息匹配 msgid: 消息唯一UUID msgid-raw: 原始消息唯一UUID, 消息消费路由后ID发生变化,该字段保留最原始的消息ID ###4. 安全控制 token: 访问控制码,不填默认空 ###5. 其他可扩展 broker: 消息经过Broker的地址 topic: 消息主题,发布订阅时使用 ack: 是否需要对当前消息ACK,不填默认true encoding: 消息体的编码格式 sub_cmd: 管理命令的二级命令 ###6 HTTP头部第一行,ZBUS协议保持一致理解 请求:GET|POST URI 应答:200 OK URI做扩展Key-Value的字符串理解 ## 协议细节 ### 生产者Produce 请求格式 * [必填]cmd: produce * [必填]mq: 目标队列 * [可选*]msgid: 消息UUID, 需要ACK时[必填] * [可选]mq-reply: 回复队列,默认为请求UUID。 需要应答的时候由mq_reply + msgid路由返回 * [可选]topic: 发布订阅时发布消息的主题 * [可选]token: 访问控制码 * [可选]HTTP消息体 承载业务数据 应答格式(在启用ack的时候才有应答) * [可选]msgid: 消息UUID=请求消息UUID,客户端匹配使用 ### 消费者Consume 请求格式 * [必填]cmd: consume * [必填]mq: 目标队列 * [必填]msgid: 消息UUID * [可选]topic: 发布订阅时订阅感兴趣消息的主题 * [可选]token: 访问控制码 应答格式 * [必填]msgid: 消息UUID,为了匹配消费请求 * [必填]broker: 消息路由经历的Broker地址 * [可选]mq-reply: 回复队列, 需要反馈结果的Consumer利用mq-reply指定目标消息队列 * [可选]msgid-raw: 原始消息UUID,需要反馈结果的Consumer利用msgid-raw指定回复消息ID * [可选]HTTP消息体 承载业务数据 ### 服务请求Request 请求格式 * [必填]cmd: request * [必填]mq: 目标队列 * [必填]msgid: 消息UUID * [必填]mq-reply: 回复队列,不制定的情况下默认为当前发送者的UUID * [可选]token: 访问控制码 * [可选]HTTP消息体 承载业务数据 应答格式(在启用ack的时候才有应答) * [必填]msgid: 消息UUID=请求消息UUID,客户端匹配使用 * [可选]HTTP消息体 承载业务数据 ### 监控管理 请求格式 * [可选]cmd: admin,不填写默认为admin * [可选]sub_cmd: create_mq * [可选*]msgid: 消息UUID, 客户端需要消息匹配时需指定 * [可选]token: 访问控制码 * [可选]HTTP消息体 承载业务数据 应答格式 * [可选*]msgid: 消息UUID=请求消息UUID,客户端匹配使用 * [可选]HTTP消息体 承载业务数据 ### URI格式 URI = / 监控首页 = /?cmd=admin&&method=index URI = /MyMQ 第一个?之前理解为消息队列 mq=MyMQ * /MyMQ?cmd=produce&&msgid=aed14-2343-1dea0-32&&body=xxxyyyzzz * /MyMQ?cmd=consume&&msgid=aed14-2343-1dea0-32 * /MyMQ?cmd=request&&msgid=aed14-2343-1dea0-32 第一个?之后理解为Key-Value, URI的KV优先级低于头部扩展