# zbus
**Repository Path**: rockjava/zbus
## Basic Information
- **Project Name**: zbus
- **Description**: 轻量级服务总线/消息队列,1)多种消息模式--支持生产者/消费者,发布订阅,RPC。2)丰富的API--C/C++/C#/JAVA/Python/Node.JS跨平台、多语言支持; 3)开放协议标准--原生兼容HTTP协议(长连接),头部动态扩展;4)支持TrackServer与ZbusServer高可用横向动态扩容机制。
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: http://zbus.org
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 914
- **Created**: 2015-04-20
- **Last Updated**: 2020-12-17
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# ZBUS--轻量级消息队列、服务总线
##**ZBUS** 特性
* **消息队列 -- 生产者消费者模式、发布订阅**
* **服务总线 -- 适配改造已有业务系统,使之具备跨平台与语言, RPC**
* **RPC -- 分布式远程方法调用,Java方法透明代理**
* **跨平台、多语言**
* **高可用、高并发**
##**ZBUS** SDK
* [Java SDK](http://git.oschina.net/rushmore/zbus "zbus-java")
* [C/C++ SDK](http://git.oschina.net/rushmore/zbus-c "zbus-c")
* [Python SDK](http://git.oschina.net/rushmore/zbus-python "zbus-python")
* [C# SDK](http://git.oschina.net/rushmore/zbus-csharp "zbus-csharp")
* [Node.JS SDK](http://git.oschina.net/rushmore/zbus-nodejs "zbus-nodejs")
##**ZBUS** 桥接
* [微软MSMQ](http://git.oschina.net/rushmore/zbus-msmq "zbus-msmq")
* [金证KCXP](http://git.oschina.net/rushmore/zbus-kcxp "zbus-kcxp")
## ZBUS 启动与监控
1. zbus-dist选择zbus.sh或者zbus.bat直接执行
2. 通过源码ZbusServer.java个性化控制启动

总线默认占用**15555**端口, [http://localhost:15555](http://localhost:15555 "默认监控地址") 可以直接进入监控,注意zbus因为原生兼容HTTP协议所以监控与消息队列使用同一个端口
**高可用模式启动总线**
分别启动ZbusServer与TrackServer,无顺序之分,默认ZbusServer占用15555端口,TrackServer占用16666端口。
## ZBUS 设计概要图

## ZBUS 示例
### Java Maven 依赖
org.zbus
zbus
5.2.0
### 生产者
//1)创建Broker代理【重量级对象,需要释放】
SingleBrokerConfig config = new SingleBrokerConfig();
config.setBrokerAddress("127.0.0.1:15555");
final Broker broker = new SingleBroker(config);
//2) 创建生产者 【轻量级对象,不需要释放,随便使用】
Producer producer = new Producer(broker, "MyMQ");
producer.createMQ(); //如果已经确定存在,不需要创建
Message msg = new Message();
msg.setBody("hello world");
Message res = producer.sendSync(msg, 1000);
System.out.println(res);
//3)销毁Broker
broker.close();
### 消费者
//1)创建Broker代表
SingleBrokerConfig brokerConfig = new SingleBrokerConfig();
brokerConfig.setBrokerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);
MqConfig config = new MqConfig();
config.setBroker(broker);
config.setMq("MyMQ");
//2) 创建消费者
@SuppressWarnings("resource")
Consumer c = new Consumer(config);
c.onMessage(new MessageCallback() {
public void onMessage(Message msg, Session sess) throws IOException {
System.out.println(msg);
}
});
### RPC动态代理【各类复杂类型】
参考源码test目下的rpc部分
SingleBrokerConfig config = new SingleBrokerConfig();
config.setBrokerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(config);
RpcConfig rpcConfig = new RpcConfig();
rpcConfig.setBroker(broker);
rpcConfig.setMq("MyRpc");
//动态代理处Interface通过zbus调用的动态实现类
Interface hello = RpcProxy.getService (Interface.class, rpcConfig);
Object[] res = hello.objectArray();
for (Object obj : res) {
System.out.println(obj);
}
Object[] array = new Object[] { getUser("rushmore"), "hong", true, 1,
String.class };
int saved = hello.saveObjectArray(array);
System.out.println(saved);
Class> ret = hello.classTest(String.class);
System.out.println(ret);
### Spring集成--服务端(RPC示例)
**无任何代码侵入使得你已有的业务接口接入到zbus,获得跨平台和多语言支持**
### Spring集成--客户端
**Spring完成zbus代理透明化,zbus设施从你的应用逻辑中彻底消失**
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("ZbusSpringClient.xml");
Interface intf = (Interface) context.getBean("interface");
System.out.println(intf.listMap());
}
# ZBUS消息协议
## 协议概览
ZBUS协议继承于HTTP协议格式,主体采用HTTP头部协议扩展完成,HTTP协议由HTTP头部和HTTP包体组成,
ZBUS协议在HTTP头部KV值做了扩展,支持浏览器方式直接访问,但是ZBUS链接机制采用保持长连接方式。
原则上, 编写客户端SDK只需要遵循下面ZBUS协议扩展即可。
ZBUS扩展头部主要是完成
* 消息命令
* ZBUS消息队列寻址
* 异步消息匹配
* 安全控制
扩展头部Key-Value解释
###1. 消息命令
命令标识,决定Broker(ZbusServer|TrackServer)的处理
cmd: produce | consume | request | heartbeat | admin(默认值)
###2. 消息队列寻址
mq: 消息目标队列
mq-reply: 消息回复队列
###3. 异步消息匹配
msgid: 消息唯一UUID
msgid-raw: 原始消息唯一UUID, 消息消费路由后ID发生变化,该字段保留最原始的消息ID
###4. 安全控制
token: 访问控制码,不填默认空
###5. 其他可扩展
broker: 消息经过Broker的地址
topic: 消息主题,发布订阅时使用
ack: 是否需要对当前消息ACK,不填默认true
encoding: 消息体的编码格式
sub_cmd: 管理命令的二级命令
###6 HTTP头部第一行,ZBUS协议保持一致理解
请求:GET|POST URI
应答:200 OK
URI做扩展Key-Value的字符串理解
## 协议细节
### 生产者Produce
请求格式
* [必填]cmd: produce
* [必填]mq: 目标队列
* [可选*]msgid: 消息UUID, 需要ACK时[必填]
* [可选]mq-reply: 回复队列,默认为请求UUID。 需要应答的时候由mq_reply + msgid路由返回
* [可选]topic: 发布订阅时发布消息的主题
* [可选]token: 访问控制码
* [可选]HTTP消息体 承载业务数据
应答格式(在启用ack的时候才有应答)
* [可选]msgid: 消息UUID=请求消息UUID,客户端匹配使用
### 消费者Consume
请求格式
* [必填]cmd: consume
* [必填]mq: 目标队列
* [必填]msgid: 消息UUID
* [可选]topic: 发布订阅时订阅感兴趣消息的主题
* [可选]token: 访问控制码
应答格式
* [必填]msgid: 消息UUID,为了匹配消费请求
* [必填]broker: 消息路由经历的Broker地址
* [可选]mq-reply: 回复队列, 需要反馈结果的Consumer利用mq-reply指定目标消息队列
* [可选]msgid-raw: 原始消息UUID,需要反馈结果的Consumer利用msgid-raw指定回复消息ID
* [可选]HTTP消息体 承载业务数据
### 服务请求Request
请求格式
* [必填]cmd: request
* [必填]mq: 目标队列
* [必填]msgid: 消息UUID
* [必填]mq-reply: 回复队列,不制定的情况下默认为当前发送者的UUID
* [可选]token: 访问控制码
* [可选]HTTP消息体 承载业务数据
应答格式(在启用ack的时候才有应答)
* [必填]msgid: 消息UUID=请求消息UUID,客户端匹配使用
* [可选]HTTP消息体 承载业务数据
### 监控管理
请求格式
* [可选]cmd: admin,不填写默认为admin
* [可选]sub_cmd: create_mq
* [可选*]msgid: 消息UUID, 客户端需要消息匹配时需指定
* [可选]token: 访问控制码
* [可选]HTTP消息体 承载业务数据
应答格式
* [可选*]msgid: 消息UUID=请求消息UUID,客户端匹配使用
* [可选]HTTP消息体 承载业务数据
### URI格式
URI = /
监控首页 = /?cmd=admin&&method=index
URI = /MyMQ
第一个?之前理解为消息队列 mq=MyMQ
* /MyMQ?cmd=produce&&msgid=aed14-2343-1dea0-32&&body=xxxyyyzzz
* /MyMQ?cmd=consume&&msgid=aed14-2343-1dea0-32
* /MyMQ?cmd=request&&msgid=aed14-2343-1dea0-32
第一个?之后理解为Key-Value, URI的KV优先级低于头部扩展