From 424cac59e5ce5c0e6772ac028a79f22cd247f979 Mon Sep 17 00:00:00 2001 From: wangguodong Date: Tue, 28 Jul 2020 15:35:21 +0800 Subject: [PATCH] ribbitmq client demo --- .../ribbitmq/example/common/Constants.java | 12 ++++++ .../ribbitmq/example/common/RibbitMqUtil.java | 2 +- .../fanout/FanoutExchangeConsumer.java | 39 +++++++++++++++++ .../fanout/FanoutExchangeConsumer2.java | 39 +++++++++++++++++ .../fanout/FanoutExchangeConsumer3.java | 39 +++++++++++++++++ .../fanout/FanoutExchangeProducer.java | 25 +++++++++++ .../com/ribbitmq/example/exchange/readme.txt | 5 +++ .../exchange/topic/TopicExchangeConsumer.java | 39 +++++++++++++++++ .../topic/TopicExchangeConsumer2.java | 39 +++++++++++++++++ .../topic/TopicExchangeConsumer3.java | 39 +++++++++++++++++ .../exchange/topic/TopicExchangeProducer.java | 25 +++++++++++ .../ribbitmq/example/message/Producer.java | 25 +++++++++++ .../example/message/PullConsumer.java | 42 +++++++++++++++++++ .../example/message/PushConsumer.java | 39 +++++++++++++++++ .../com/ribbitmq/example/message/readme.txt | 3 ++ .../example/transaction/RibbitConsumer.java | 42 +++++++++++++++++++ .../example/transaction/RibbitProducer.java | 39 +++++++++++++++++ 17 files changed, 492 insertions(+), 1 deletion(-) create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer2.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer3.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeProducer.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/exchange/readme.txt create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer2.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer3.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeProducer.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/message/Producer.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/message/PullConsumer.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/message/PushConsumer.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/message/readme.txt create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/transaction/RibbitConsumer.java create mode 100644 rocketmq-example/src/main/java/com/ribbitmq/example/transaction/RibbitProducer.java diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/common/Constants.java b/rocketmq-example/src/main/java/com/ribbitmq/example/common/Constants.java index 4941e4d..814192c 100644 --- a/rocketmq-example/src/main/java/com/ribbitmq/example/common/Constants.java +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/common/Constants.java @@ -5,4 +5,16 @@ public class Constants { public static final String EXCHANGE_NAME = "quick-start"; public static final String ROUTING_KEY = "test"; + + public static final String TRANSACTION_EXCHANGE_NAME = "transaction-ex"; + + public static final String TRANSACTION_ROUTING_KEY = "transaction"; + + public static final String FANOUT_EXCHANGE_NAME = "fanout-exchange"; + + public static final String FANOUT_ROUTING_KEY = "fanout"; + + public static final String TOPIC_EXCHANGE_NAME = "topic-exchange"; + + public static final String TOPIC_ROUTING_KEY = "topic"; } diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/common/RibbitMqUtil.java b/rocketmq-example/src/main/java/com/ribbitmq/example/common/RibbitMqUtil.java index 87eb139..0892d37 100644 --- a/rocketmq-example/src/main/java/com/ribbitmq/example/common/RibbitMqUtil.java +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/common/RibbitMqUtil.java @@ -15,7 +15,7 @@ public class RibbitMqUtil { public static Connection createConnection() { ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setHost("127.0.0.1"); + connectionFactory.setHost("192.168.152.137"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer.java b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer.java new file mode 100644 index 0000000..ddd736d --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer.java @@ -0,0 +1,39 @@ +package com.ribbitmq.example.exchange.fanout; + +import com.rabbitmq.client.*; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class FanoutExchangeConsumer { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.FANOUT_EXCHANGE_NAME, BuiltinExchangeType.FANOUT); + + String queueName = "queue2"; + /** + * 5.声明(创建)一个队列 + * queue: 队列名字 + * durable: 是否持久化 + * exclusive: 是否独占 + * autoDelete: 队列脱离exchange,自动删除 + * arguments: 扩展参数 + */ + channel.queueDeclare(queueName, false, false, false, null); + channel.queueBind(queueName, Constants.FANOUT_EXCHANGE_NAME, Constants.FANOUT_ROUTING_KEY); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + log.info("exchange:{}, routingKey:{}, delivaryTag:{}, message:{}", envelope.getExchange(), envelope.getRoutingKey(), envelope.getDeliveryTag(), new String(body)); + } + }; + channel.basicConsume(queueName, true, consumer); + } +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer2.java b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer2.java new file mode 100644 index 0000000..cf0c7dd --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer2.java @@ -0,0 +1,39 @@ +package com.ribbitmq.example.exchange.fanout; + +import com.rabbitmq.client.*; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class FanoutExchangeConsumer2 { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.FANOUT_EXCHANGE_NAME, BuiltinExchangeType.FANOUT); + + String queueName = "queue1"; + /** + * 5.声明(创建)一个队列 + * queue: 队列名字 + * durable: 是否持久化 + * exclusive: 是否独占 + * autoDelete: 队列脱离exchange,自动删除 + * arguments: 扩展参数 + */ + channel.queueDeclare(queueName, false, false, false, null); + channel.queueBind(queueName, Constants.FANOUT_EXCHANGE_NAME, Constants.FANOUT_ROUTING_KEY); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + log.info("exchange:{}, routingKey:{}, delivaryTag:{}, message:{}", envelope.getExchange(), envelope.getRoutingKey(), envelope.getDeliveryTag(), new String(body)); + } + }; + channel.basicConsume(queueName, true, consumer); + } +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer3.java b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer3.java new file mode 100644 index 0000000..e3029f3 --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeConsumer3.java @@ -0,0 +1,39 @@ +package com.ribbitmq.example.exchange.fanout; + +import com.rabbitmq.client.*; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class FanoutExchangeConsumer3 { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.FANOUT_EXCHANGE_NAME, BuiltinExchangeType.FANOUT); + + String queueName = "queue3"; + /** + * 5.声明(创建)一个队列 + * queue: 队列名字 + * durable: 是否持久化 + * exclusive: 是否独占 + * autoDelete: 队列脱离exchange,自动删除 + * arguments: 扩展参数 + */ + channel.queueDeclare(queueName, false, false, false, null); + channel.queueBind(queueName, Constants.FANOUT_EXCHANGE_NAME, "route-example"); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + log.info("exchange:{}, routingKey:{}, delivaryTag:{}, message:{}", envelope.getExchange(), envelope.getRoutingKey(), envelope.getDeliveryTag(), new String(body)); + } + }; + channel.basicConsume(queueName, true, consumer); + } +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeProducer.java b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeProducer.java new file mode 100644 index 0000000..f673e48 --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/fanout/FanoutExchangeProducer.java @@ -0,0 +1,25 @@ +package com.ribbitmq.example.exchange.fanout; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class FanoutExchangeProducer { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.FANOUT_EXCHANGE_NAME, BuiltinExchangeType.FANOUT); + String message = "hello,it's a message"; + channel.basicPublish(Constants.FANOUT_EXCHANGE_NAME, Constants.FANOUT_ROUTING_KEY, null, message.getBytes()); + channel.basicPublish(Constants.FANOUT_EXCHANGE_NAME, "route-example", null, message.getBytes()); + channel.close(); + RibbitMqUtil.close(connection); + } + +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/readme.txt b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/readme.txt new file mode 100644 index 0000000..4c7c9fd --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/readme.txt @@ -0,0 +1,5 @@ +fanout-exchange:广播消息给所有绑定交换机的队列 + +direct-exchange:路由routingKey匹配的队列中 + +topic-exchange:订阅不同的topic,rotingKey形如:com.ribbit.client,通过形式如:#.ribbit.# com.# \ No newline at end of file diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer.java b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer.java new file mode 100644 index 0000000..e2bd2c2 --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer.java @@ -0,0 +1,39 @@ +package com.ribbitmq.example.exchange.topic; + +import com.rabbitmq.client.*; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class TopicExchangeConsumer { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + + String queueName = "topic-queue1"; + /** + * 5.声明(创建)一个队列 + * queue: 队列名字 + * durable: 是否持久化 + * exclusive: 是否独占 + * autoDelete: 队列脱离exchange,自动删除 + * arguments: 扩展参数 + */ + channel.queueDeclare(queueName, false, false, false, null); + channel.queueBind(queueName, Constants.TOPIC_EXCHANGE_NAME, "com.#"); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + log.info("exchange:{}, routingKey:{}, delivaryTag:{}, message:{}", envelope.getExchange(), envelope.getRoutingKey(), envelope.getDeliveryTag(), new String(body)); + } + }; + channel.basicConsume(queueName, true, consumer); + } +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer2.java b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer2.java new file mode 100644 index 0000000..20e2447 --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer2.java @@ -0,0 +1,39 @@ +package com.ribbitmq.example.exchange.topic; + +import com.rabbitmq.client.*; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class TopicExchangeConsumer2 { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + + String queueName = "topic-queue2"; + /** + * 5.声明(创建)一个队列 + * queue: 队列名字 + * durable: 是否持久化 + * exclusive: 是否独占 + * autoDelete: 队列脱离exchange,自动删除 + * arguments: 扩展参数 + */ + channel.queueDeclare(queueName, false, false, false, null); + channel.queueBind(queueName, Constants.TOPIC_EXCHANGE_NAME, "org.*"); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + log.info("exchange:{}, routingKey:{}, delivaryTag:{}, message:{}", envelope.getExchange(), envelope.getRoutingKey(), envelope.getDeliveryTag(), new String(body)); + } + }; + channel.basicConsume(queueName, true, consumer); + } +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer3.java b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer3.java new file mode 100644 index 0000000..ecfaa53 --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeConsumer3.java @@ -0,0 +1,39 @@ +package com.ribbitmq.example.exchange.topic; + +import com.rabbitmq.client.*; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class TopicExchangeConsumer3 { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + + String queueName = "topic-queue3"; + /** + * 5.声明(创建)一个队列 + * queue: 队列名字 + * durable: 是否持久化 + * exclusive: 是否独占 + * autoDelete: 队列脱离exchange,自动删除 + * arguments: 扩展参数 + */ + channel.queueDeclare(queueName, false, false, false, null); + channel.queueBind(queueName, Constants.TOPIC_EXCHANGE_NAME, "org.#"); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + log.info("exchange:{}, routingKey:{}, delivaryTag:{}, message:{}", envelope.getExchange(), envelope.getRoutingKey(), envelope.getDeliveryTag(), new String(body)); + } + }; + channel.basicConsume(queueName, true, consumer); + } +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeProducer.java b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeProducer.java new file mode 100644 index 0000000..054917a --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/exchange/topic/TopicExchangeProducer.java @@ -0,0 +1,25 @@ +package com.ribbitmq.example.exchange.topic; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class TopicExchangeProducer { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME, "com.ribbit.client", null, "hello,ribbit".getBytes()); + channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME, "org.apache.rocketmq", null, "hello,rocket".getBytes()); + channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME, "org.struts", null, "hello,struts".getBytes()); + channel.close(); + RibbitMqUtil.close(connection); + } + +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/message/Producer.java b/rocketmq-example/src/main/java/com/ribbitmq/example/message/Producer.java new file mode 100644 index 0000000..096c61a --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/message/Producer.java @@ -0,0 +1,25 @@ +package com.ribbitmq.example.message; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class Producer { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME, "com.ribbit.client", null, "hello,ribbit".getBytes()); + channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME, "org.apache.rocketmq", null, "hello,rocket".getBytes()); + channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME, "org.struts", null, "hello,struts".getBytes()); + channel.close(); + RibbitMqUtil.close(connection); + } + +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/message/PullConsumer.java b/rocketmq-example/src/main/java/com/ribbitmq/example/message/PullConsumer.java new file mode 100644 index 0000000..18ac795 --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/message/PullConsumer.java @@ -0,0 +1,42 @@ +package com.ribbitmq.example.message; + +import com.rabbitmq.client.*; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class PullConsumer { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + + String queueName = "topic-queue3"; + /** + * 5.声明(创建)一个队列 + * queue: 队列名字 + * durable: 是否持久化 + * exclusive: 是否独占 + * autoDelete: 队列脱离exchange,自动删除 + * arguments: 扩展参数 + */ + channel.queueDeclare(queueName, false, false, false, null); + channel.queueBind(queueName, Constants.TOPIC_EXCHANGE_NAME, "org.*"); + + GetResponse response = channel.basicGet(queueName,true); + if (response != null) { + // 未消费消息条数,0时最后一条 + log.info("获取消息条数:{}", response.getMessageCount()); + log.info("获取消息:{}", new String(response.getBody())); + } + + channel.close(); + RibbitMqUtil.close(connection); + } +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/message/PushConsumer.java b/rocketmq-example/src/main/java/com/ribbitmq/example/message/PushConsumer.java new file mode 100644 index 0000000..013e5c9 --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/message/PushConsumer.java @@ -0,0 +1,39 @@ +package com.ribbitmq.example.message; + +import com.rabbitmq.client.*; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class PushConsumer { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); + + String queueName = "topic-queue2"; + /** + * 5.声明(创建)一个队列 + * queue: 队列名字 + * durable: 是否持久化 + * exclusive: 是否独占 + * autoDelete: 队列脱离exchange,自动删除 + * arguments: 扩展参数 + */ + channel.queueDeclare(queueName, false, false, false, null); + channel.queueBind(queueName, Constants.TOPIC_EXCHANGE_NAME, "org.*"); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + log.info("exchange:{}, routingKey:{}, delivaryTag:{}, message:{}", envelope.getExchange(), envelope.getRoutingKey(), envelope.getDeliveryTag(), new String(body)); + } + }; + channel.basicConsume(queueName, true, consumer); + } +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/message/readme.txt b/rocketmq-example/src/main/java/com/ribbitmq/example/message/readme.txt new file mode 100644 index 0000000..061de46 --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/message/readme.txt @@ -0,0 +1,3 @@ +消费消息方式: + 拉取消息:basicGet 应用主动拉取 + 推送消息:basicConsume 由ribbitServer推送消息 \ No newline at end of file diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/transaction/RibbitConsumer.java b/rocketmq-example/src/main/java/com/ribbitmq/example/transaction/RibbitConsumer.java new file mode 100644 index 0000000..c56b62c --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/transaction/RibbitConsumer.java @@ -0,0 +1,42 @@ +package com.ribbitmq.example.transaction; + +import com.rabbitmq.client.*; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class RibbitConsumer { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TRANSACTION_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); + + String queueName = "transactionQueue"; + /** + * 5.声明(创建)一个队列 + * queue: 队列名字 + * durable: 是否持久化 + * exclusive: 是否独占 + * autoDelete: 队列脱离exchange,自动删除 + * arguments: 扩展参数 + */ + channel.queueDeclare(queueName, true, false, false, null); + channel.queueBind(queueName, Constants.TRANSACTION_EXCHANGE_NAME, Constants.TRANSACTION_ROUTING_KEY); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + log.info("exchange:{}, routingKey:{}, delivaryTag:{}, message:{}", envelope.getExchange(), envelope.getRoutingKey(), envelope.getDeliveryTag(), new String(body)); + } + }; + + channel.basicConsume(queueName, true, consumer); + channel.close(); + RibbitMqUtil.close(connection); + } +} diff --git a/rocketmq-example/src/main/java/com/ribbitmq/example/transaction/RibbitProducer.java b/rocketmq-example/src/main/java/com/ribbitmq/example/transaction/RibbitProducer.java new file mode 100644 index 0000000..6dc939c --- /dev/null +++ b/rocketmq-example/src/main/java/com/ribbitmq/example/transaction/RibbitProducer.java @@ -0,0 +1,39 @@ +package com.ribbitmq.example.transaction; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.ribbitmq.example.common.Constants; +import com.ribbitmq.example.common.RibbitMqUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class RibbitProducer { + + public static void main(String[] args) throws IOException, TimeoutException { + Connection connection = RibbitMqUtil.createConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(Constants.TRANSACTION_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); + + channel.txSelect(); + try { + for (int i=0;i<5;i++) { + if (i == 4) { + i = i / 0; + } + String message = "hello,it's a transaction message " + i; + channel.basicPublish(Constants.TRANSACTION_EXCHANGE_NAME, Constants.TRANSACTION_ROUTING_KEY, null, message.getBytes()); + } + channel.txCommit(); + } catch (Exception e) { + log.error("出现异常,事务回滚:{}", e); + channel.txRollback(); + } + channel.close(); + RibbitMqUtil.close(connection); + } + +} -- Gitee