diff --git a/README.md b/README.md index 26d821ad8fc45c0572d3a714997ef6bda3a1086b..da451f0eb042d3acf9f989e2beb31d2cbdbf0d77 100644 --- a/README.md +++ b/README.md @@ -25,4 +25,6 @@ SpringBoot应用集合 - [SpringBoot利用springsession实现session共享](https://gitee.com/superbutton/SpringBoot-Components/tree/develop/Springboot-Springsession) - [SpringBoot整合mail](https://gitee.com/superbutton/SpringBoot-Components/tree/develop/Springboot-Mail) - [SpringBoot整合netty](https://gitee.com/superbutton/SpringBoot-Components/tree/develop/Springboot-Netty-Parent) -- [SpringBoot整合webservice](https://gitee.com/superbutton/SpringBoot-Components/tree/develop/Springboot-Webservice) \ No newline at end of file +- [SpringBoot整合webservice](https://gitee.com/superbutton/SpringBoot-Components/tree/develop/Springboot-Webservice) +- [SpringBoot整合ActiveMq](https://gitee.com/superbutton/SpringBoot-Components/tree/develop/Springboot-ActiveMq) + diff --git a/Springboot-ActiveMq/README.md b/Springboot-ActiveMq/README.md new file mode 100644 index 0000000000000000000000000000000000000000..afe56664939976cd3215d5c32fba1bc2c0925798 --- /dev/null +++ b/Springboot-ActiveMq/README.md @@ -0,0 +1,5 @@ +swagger地址: +http://localhost:8080/activemq/swagger-ui.html + +ActiveMq安装 +https://blog.csdn.net/Future_LL/article/details/84898868 \ No newline at end of file diff --git a/Springboot-ActiveMq/pom.xml b/Springboot-ActiveMq/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..278fc2134938da68eebbd22e2a32e11e154d2187 --- /dev/null +++ b/Springboot-ActiveMq/pom.xml @@ -0,0 +1,92 @@ + + 4.0.0 + com.button + Springboot-ActiveMq + 0.0.1-SNAPSHOT + jar + + + UTF-8 + UTF-8 + 1.8 + + + org.springframework.boot + spring-boot-starter-parent + 2.0.1.RELEASE + + + + + + org.springframework.boot + spring-boot-starter-web + + + + com.alibaba + fastjson + 1.2.47 + + + net.sf.json-lib + json-lib + 2.2.2 + jdk15 + + + + + com.google.code.gson + gson + + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-devtools + true + true + + + + org.springframework.boot + spring-boot-starter-activemq + + + org.apache.activemq + activemq-pool + + + + io.springfox + springfox-swagger2 + 2.7.0 + + + io.springfox + springfox-swagger-ui + 2.7.0 + + + + Springboot-ActiveMq + + + org.springframework.boot + spring-boot-maven-plugin + + -Dfile.encoding=UTF-8 + true + + + + + \ No newline at end of file diff --git a/Springboot-ActiveMq/src/main/java/com/button/boot/active/SpringbootActiveMqApplication.java b/Springboot-ActiveMq/src/main/java/com/button/boot/active/SpringbootActiveMqApplication.java new file mode 100644 index 0000000000000000000000000000000000000000..2189677dc472aeebaee58b07f0549c6203414263 --- /dev/null +++ b/Springboot-ActiveMq/src/main/java/com/button/boot/active/SpringbootActiveMqApplication.java @@ -0,0 +1,11 @@ +package com.button.boot.active; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SpringbootActiveMqApplication { + public static void main(String[] args) { + SpringApplication.run(SpringbootActiveMqApplication.class, args); + } +} diff --git a/Springboot-ActiveMq/src/main/java/com/button/boot/active/comsumer/Consumer1.java b/Springboot-ActiveMq/src/main/java/com/button/boot/active/comsumer/Consumer1.java new file mode 100644 index 0000000000000000000000000000000000000000..c5196a7b0bb365f94211204e2672284557d83ee1 --- /dev/null +++ b/Springboot-ActiveMq/src/main/java/com/button/boot/active/comsumer/Consumer1.java @@ -0,0 +1,73 @@ +package com.button.boot.active.comsumer; + +import java.time.LocalDateTime; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.stereotype.Component; + +import com.button.boot.active.model.User; + +import net.sf.json.JSONObject; + +@Component +public class Consumer1 { + private static final Logger log = LoggerFactory.getLogger(Consumer1.class); + + @JmsListener(destination = "queue_string",containerFactory = "queueListenerFactory") + public void receiveQueue(String text) { + log.info("consumer1收到queue_string信息:{}", text); + } + + @JmsListener(destination = "queue_user",containerFactory = "queueListenerFactory") + public void receiveQueue(User user) { + log.info("consumer1收到queue_user信息:{}", JSONObject.fromObject(user)); + } + + @JmsListener(destination = "queue_return",containerFactory = "queueListenerFactory") + @SendTo("queue_return") //该注解的意思是将return回的值,再发送的"queue_return"队列中 + public String receive2WayQueue(String text) { + log.info("consumer1收到queue_return的信息:{}",text); + return "queue_return最后return的内容"; + } + + //接收queue_return的内容 + @JmsListener(destination = "queue_return", containerFactory = "queueListenerFactory") + public void receiveReturnQueue(String text) { + log.info("consumer1收到queue_return信息:{}",text); + } + + @SuppressWarnings("unused") + @JmsListener(destination = "queue_ack", containerFactory = "queueListenerACKFactory") + public void receiveQueue(TextMessage message, Session session) throws JMSException { + log.info("consumer1收到queue_string信息:{}", message.getText()); + try { + int a = 1 / 0; + message.acknowledge(); //消息确认 + } catch (Exception e) { + log.error("消费消息是出现异常,消息重发. e={}", e); + session.recover(); //消息重发,默认每秒重发1次,一共重发6次 + } + } + + @JmsListener(destination = "topic_string",containerFactory = "topicListenerFactory") + public void receiveTopic(String text) { + log.info("consumer1收到topic_string信息:{}", text); + } + + @JmsListener(destination = "topic_user",containerFactory = "topicListenerFactory") + public void receiveTopic(User user) { + log.info("consumer1收到topic_user信息:{}", JSONObject.fromObject(user)); + } + + @JmsListener(destination = "topic_delay",containerFactory = "topicListenerFactory") + public void receiveDelayTopic(String text) { + log.info("consumer1收到topic_delay_string延时信息:{},接收时间:{}", text, LocalDateTime.now()); + } +} diff --git a/Springboot-ActiveMq/src/main/java/com/button/boot/active/comsumer/Consumer2.java b/Springboot-ActiveMq/src/main/java/com/button/boot/active/comsumer/Consumer2.java new file mode 100644 index 0000000000000000000000000000000000000000..0c295834662bcd10540b54228e5899a2165893cf --- /dev/null +++ b/Springboot-ActiveMq/src/main/java/com/button/boot/active/comsumer/Consumer2.java @@ -0,0 +1,42 @@ +package com.button.boot.active.comsumer; + +import java.time.LocalDateTime; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Component; + +import com.button.boot.active.model.User; + +import net.sf.json.JSONObject; + +@Component +public class Consumer2 { + private static final Logger log = LoggerFactory.getLogger(Consumer2.class); + + @JmsListener(destination = "queue_string", containerFactory = "queueListenerFactory") + public void receiveQueue(String text) { + log.info("consumer2收到queue_string信息:{}", text); + } + + @JmsListener(destination = "queue_user", containerFactory = "queueListenerFactory") + public void receiveQueue(User user) { + log.info("consumer2收到queue_user信息:{}", JSONObject.fromObject(user)); + } + + @JmsListener(destination = "topic_string", containerFactory = "topicListenerFactory") + public void receiveTopic(String text) { + log.info("consumer2收到topic_string信息:{}", text); + } + + @JmsListener(destination = "topic_user", containerFactory = "topicListenerFactory") + public void receiveTopic(User user) { + log.info("consumer2收到topic_user信息:{}", JSONObject.fromObject(user)); + } + + @JmsListener(destination = "topic_delay", containerFactory = "topicListenerFactory") + public void receiveDelayTopic(String text) { + log.info("consumer2收到topic_delay_string延时信息:{},接收时间:{}", text, LocalDateTime.now()); + } +} diff --git a/Springboot-ActiveMq/src/main/java/com/button/boot/active/config/ActiveMqConfig.java b/Springboot-ActiveMq/src/main/java/com/button/boot/active/config/ActiveMqConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..9498482cb7d677b491333d5c56bb8047f4558eb1 --- /dev/null +++ b/Springboot-ActiveMq/src/main/java/com/button/boot/active/config/ActiveMqConfig.java @@ -0,0 +1,116 @@ +package com.button.boot.active.config; + +import javax.jms.ConnectionFactory; +import javax.jms.Queue; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jms.annotation.EnableJms; +import org.springframework.jms.config.DefaultJmsListenerContainerFactory; +import org.springframework.jms.config.JmsListenerContainerFactory; + +@Configuration +@EnableJms +public class ActiveMqConfig { + @Bean + public JmsListenerContainerFactory queueListenerFactory(ConnectionFactory connectionFactory) { + DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); + factory.setPubSubDomain(false); + factory.setConnectionFactory(connectionFactory); + return factory; + } + + @Bean //此处是结合springboot设置的redeliveryPolicy + public RedeliveryPolicy redeliveryPolicy(){ + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + //设置初始化重发延迟时间,默认1000毫秒 + redeliveryPolicy.setInitialRedeliveryDelay(1000); + //设置最大重发次数,默认为6 + redeliveryPolicy.setMaximumRedeliveries(1); + //是否开启重发延迟大小倍数递增,默认false + redeliveryPolicy.setUseExponentialBackOff(true); + //重发延迟递增倍数,默认为5 + redeliveryPolicy.setBackOffMultiplier(2); + //这个参数主要是为了防止高并发下,消息重发会在同一时刻发生,让消息在时间上消费的更加均衡,默认为false + redeliveryPolicy.setUseCollisionAvoidance(true); + //设置下次重发延迟波动百分比,默认0.15,此处写成15是因为源码中对参数*0.01 + redeliveryPolicy.setCollisionAvoidancePercent((short)15); + //设置最大重发延迟,默认为-1,表示无限增大 + redeliveryPolicy.setMaximumRedeliveryDelay(10000); + return redeliveryPolicy; + } + @Bean + public ActiveMQConnectionFactory activeMQConnectionFactory(@Autowired RedeliveryPolicy redeliveryPolicy, @Value("${spring.activemq.broker-url}") String url){ + ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(url); + activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); + return activeMQConnectionFactory; + } + + @Bean + public JmsListenerContainerFactory queueListenerACKFactory(@Autowired ActiveMQConnectionFactory activeMQConnectionFactory) { + DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); + factory.setPubSubDomain(false); + activeMQConnectionFactory.setTrustAllPackages(true); + factory.setConnectionFactory(activeMQConnectionFactory); + /** + * ack设置为activemq独有的单条确认模式:4, + * 至于为什么不设置为客户端手动确认:2,因为客户端手动确认会失效, + * 原因为spring框架会判断是否是2,如果是2会spring框架会帮你确认, + * 可查看AbstractMessageListenerContainer[commitIfNecessary()] + */ + factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + return factory; + } + + @Bean + public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory) { + DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); + factory.setPubSubDomain(true); + factory.setConcurrency("1"); + factory.setConnectionFactory(connectionFactory); + return factory; + } + + @Bean + public Queue queueString(){ + return new ActiveMQQueue("queue_string"); + } + + @Bean + public Queue queueUser(){ + return new ActiveMQQueue("queue_user"); + } + + @Bean + public Queue queueReturn(){ + return new ActiveMQQueue("queue_return"); + } + + @Bean + public Queue queueStringACK(){ + return new ActiveMQQueue("queue_ack"); + } + + @Bean + public Topic topicString(){ + return new ActiveMQTopic("topic_string"); + } + + @Bean + public Topic topicUser(){ + return new ActiveMQTopic("topic_user"); + } + + @Bean + public Topic delayTopicString(){ + return new ActiveMQTopic("topic_delay"); + } +} diff --git a/Springboot-ActiveMq/src/main/java/com/button/boot/active/config/SwaggerConf.java b/Springboot-ActiveMq/src/main/java/com/button/boot/active/config/SwaggerConf.java new file mode 100644 index 0000000000000000000000000000000000000000..ac31f4d849bf3ab8ed21ccca964227ebf0626c3f --- /dev/null +++ b/Springboot-ActiveMq/src/main/java/com/button/boot/active/config/SwaggerConf.java @@ -0,0 +1,34 @@ +package com.button.boot.active.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import springfox.documentation.builders.ApiInfoBuilder; +import springfox.documentation.builders.PathSelectors; +import springfox.documentation.builders.RequestHandlerSelectors; +import springfox.documentation.service.ApiInfo; +import springfox.documentation.spi.DocumentationType; +import springfox.documentation.spring.web.plugins.Docket; +import springfox.documentation.swagger2.annotations.EnableSwagger2; + +@Configuration +@EnableSwagger2 +public class SwaggerConf { + + @Bean + public Docket createRestApiButton() { + return new Docket(DocumentationType.SWAGGER_2).groupName("ActiveMq").apiInfo(apiInfo()) + .select().apis(RequestHandlerSelectors.basePackage("com.button.boot.active.controller")) + .paths(PathSelectors.any()).build(); + } + + private ApiInfo apiInfo() { + return new ApiInfoBuilder() + .title("ActiveMp开发测试文档") + .description("测试ActiveMq") + .termsOfServiceUrl("http://www.baidu.com") + .version("1.0") + .build(); + } + +} diff --git a/Springboot-ActiveMq/src/main/java/com/button/boot/active/controller/ActiveMqController.java b/Springboot-ActiveMq/src/main/java/com/button/boot/active/controller/ActiveMqController.java new file mode 100644 index 0000000000000000000000000000000000000000..e7eaa90f737f5285b06b0d439b07b157bce3ccf7 --- /dev/null +++ b/Springboot-ActiveMq/src/main/java/com/button/boot/active/controller/ActiveMqController.java @@ -0,0 +1,120 @@ +package com.button.boot.active.controller; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import com.button.boot.active.model.User; +import com.button.boot.active.service.ActiveMqService; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiParam; +import net.sf.json.JSONObject; + +@Api("ActiveMq相关的api") +@RestController +@RequestMapping(value = "/activemq") +public class ActiveMqController { + private static final Logger log = LoggerFactory.getLogger(ActiveMqController.class); + @Autowired + private ActiveMqService activeMqService; + + @GetMapping(value = "/sendQueueMsg") + public void sendQueueMsg(@RequestParam(value = "msg", required = true) String msg) { + try { + log.info("开始使用sendQueueMsg发送消息. msg={}", msg); + activeMqService.sendQueueMsg(msg); + log.info("使用sendQueueMsg发送消息完成. msg={}", msg); + } catch (Exception e) { + log.error("使用sendQueueMsg发送消息时出现异常. e={}", e); + } + } + @PostMapping(value = "/sendQueueUserMsg") + public void sendQueueUserMsg(@ApiParam("同步请求实体对象") User user) { + try { + log.info("开始使用sendQueueUserMsg发送消息. user={}", JSONObject.fromObject(user)); + activeMqService.sendQueueMsg(user); + log.info("使用sendQueueUserMsg发送消息完成. user={}", JSONObject.fromObject(user)); + } catch (Exception e) { + log.error("使用sendQueueUserMsg发送消息时出现异常. e={}", e); + } + } + @PostMapping(value = "/send2Return") + public void send2Return(@RequestParam(value = "msg", required = true) String msg) { + try { + log.info("开始使用send2Return发送消息. msg={}", msg); + activeMqService.send2Return(msg); + log.info("使用send2Return发送消息完成. msg={}", msg); + } catch (Exception e) { + log.error("使用send2Return发送消息时出现异常. e={}", e); + } + } + + @PostMapping(value = "/sendACKQueueMsg") + public void sendACKQueueMsg(String msg) { + try { + log.info("开始使用sendACKQueueMsg发送消息. msg={}", msg); + activeMqService.sendACKQueueMsg(msg); + log.info("使用sendACKQueueMsg发送消息完成. msg={}", msg); + } catch (Exception e) { + log.error("使用sendACKQueueMsg发送消息时出现异常. e={}", e); + } + } + + @PostMapping(value = "/sendTopicMsg") + public void sendTopicMsg(@RequestParam(value = "msg", required = true) String msg) { + try { + log.info("开始使用sendTopicMsg发送消息. msg={}", msg); + activeMqService.sendTopicMsg(msg); + log.info("使用sendTopicMsg发送消息完成. msg={}", msg); + } catch (Exception e) { + log.error("使用sendTopicMsg发送消息时出现异常. e={}", e); + } + } + + @PostMapping(value = "/sendTopicUserMsg") + public void sendTopicUserMsg(@ApiParam("同步请求实体对象") User user) { + try { + log.info("开始使用sendTopicUserMsg发送消息. user={}", JSONObject.fromObject(user)); + activeMqService.sendTopicMsg(user); + log.info("使用sendTopicUserMsg发送消息完成. user={}", JSONObject.fromObject(user)); + } catch (Exception e) { + log.error("使用sendTopicUserMsg发送消息时出现异常. e={}", e); + } + } + /** + * 使用延时队列需要在activemq.xml中的标签里添加schedulerSupport="true",如下所示: + * + * @return + */ + @PostMapping(value = "/sendDelayTopicMsg") + public void sendDelayTopicMsg(@RequestParam(value = "msg", required = true) String msg) { + try { + log.info("开始使用sendDelayTopicMsg发送消息. msg={}", msg); + activeMqService.sendDelayTopicMsg(msg); + log.info("使用sendDelayTopicMsg发送消息完成. msg={}", msg); + } catch (Exception e) { + log.error("使用sendDelayTopicMsg发送消息时出现异常. e={}", e); + } + } + + /** + * 使用延时队列需要在activemq.xml中的标签里添加schedulerSupport="true",如下: + * @return + */ + @PostMapping(value = "/sendDelayTopicTimeMsg") + public void sendDelayTopicTimeMsg(String msg, long time) { + try { + log.info("开始使用sendDelayTopicTimeMsg发送消息. msg={}, time={}", msg, time); + activeMqService.sendDelayTopicMsg(msg, time); + log.info("使用sendDelayTopicTimeMsg发送消息完成. msg={}, time={}", msg, time); + } catch (Exception e) { + log.error("使用sendDelayTopicTimeMsg发送消息时出现异常. msg={}, time={}, e={}", msg, time, e); + } + } +} diff --git a/Springboot-ActiveMq/src/main/java/com/button/boot/active/model/User.java b/Springboot-ActiveMq/src/main/java/com/button/boot/active/model/User.java new file mode 100644 index 0000000000000000000000000000000000000000..eedbb28f9f0948d8adfa1f478e62b90c6d35eaba --- /dev/null +++ b/Springboot-ActiveMq/src/main/java/com/button/boot/active/model/User.java @@ -0,0 +1,33 @@ +package com.button.boot.active.model; + +import java.io.Serializable; + +public class User implements Serializable { + private static final long serialVersionUID = 1L; + + private String name; + private Integer age; + + public User(String name, Integer age) { + super(); + this.name = name; + this.age = age; + } + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public Integer getAge() { + return age; + } + public void setAge(Integer age) { + this.age = age; + } + + @Override + public String toString() { + return "User [name=" + name + ", age=" + age + "]"; + } +} diff --git a/Springboot-ActiveMq/src/main/java/com/button/boot/active/service/ActiveMqService.java b/Springboot-ActiveMq/src/main/java/com/button/boot/active/service/ActiveMqService.java new file mode 100644 index 0000000000000000000000000000000000000000..b82a1f10c8490e19ffdc8a8198d7da401a08cad6 --- /dev/null +++ b/Springboot-ActiveMq/src/main/java/com/button/boot/active/service/ActiveMqService.java @@ -0,0 +1,23 @@ +package com.button.boot.active.service; + +import javax.jms.JMSException; + +import com.button.boot.active.model.User; + +public interface ActiveMqService { + void sendQueueMsg(String msg) throws JMSException; + + void sendQueueMsg(User user) throws JMSException; + + void send2Return(String msg) throws JMSException; + + void sendACKQueueMsg(String msg) throws JMSException; + + void sendTopicMsg(String msg) throws JMSException; + + void sendTopicMsg(User user) throws JMSException; + + void sendDelayTopicMsg(String msg) throws JMSException; + + void sendDelayTopicMsg(String msg,long time) throws JMSException; +} diff --git a/Springboot-ActiveMq/src/main/java/com/button/boot/active/service/impl/ActiveMqServiceImpl.java b/Springboot-ActiveMq/src/main/java/com/button/boot/active/service/impl/ActiveMqServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..1f7039032c44100188ad91cebfdab49c11ea96d2 --- /dev/null +++ b/Springboot-ActiveMq/src/main/java/com/button/boot/active/service/impl/ActiveMqServiceImpl.java @@ -0,0 +1,105 @@ +package com.button.boot.active.service.impl; + +import java.time.LocalDateTime; + +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.ScheduledMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jms.core.JmsMessagingTemplate; +import org.springframework.stereotype.Service; + +import com.button.boot.active.model.User; +import com.button.boot.active.service.ActiveMqService; + +import net.sf.json.JSONObject; + +@Service +public class ActiveMqServiceImpl implements ActiveMqService { + private static final Logger log = LoggerFactory.getLogger(ActiveMqServiceImpl.class); + + @Autowired + private Queue queueString; + + @Autowired + private Queue queueUser; + + @Autowired + private Queue queueReturn; + + @Autowired + private Topic topicString; + + @Autowired + private Topic topicUser; + + @Autowired + private Topic delayTopicString; + + @Autowired + private Queue queueStringACK; + + @Autowired + private JmsMessagingTemplate jmsMessagingTemplate; + + @Override + public void sendQueueMsg(String msg) throws JMSException { + log.info("sendQueueMsg-msg开始发送信息:{} --> {}", msg, this.queueString.getQueueName()); + this.jmsMessagingTemplate.convertAndSend(this.queueString, msg); + } + @Override + public void sendQueueMsg(User user) throws JMSException { + log.info("sendQueueMsg-user开始发送信息:{} --> {}", JSONObject.fromObject(user), this.queueUser.getQueueName()); + this.jmsMessagingTemplate.convertAndSend(this.queueUser, user); + } + @Override + public void send2Return(String msg) throws JMSException { + log.info("send2WayQueueMsg-msg开始发送信息:{} --> {}", msg, this.queueReturn.getQueueName()); + this.jmsMessagingTemplate.convertAndSend(this.queueReturn,msg); + } + + @Override + public void sendACKQueueMsg(String msg) throws JMSException { + log.info("sendACKQueueMsg开始发送信息:{} --> {}", msg, this.queueStringACK.getQueueName()); + this.jmsMessagingTemplate.convertAndSend(this.queueStringACK,msg); + } + + + + @Override + public void sendTopicMsg(String msg) throws JMSException { + log.info("sendTopicMsg-msg发送信息:{} --> {}", msg, this.topicString.getTopicName()); + this.jmsMessagingTemplate.convertAndSend(this.topicString,msg); + } + + @Override + public void sendTopicMsg(User user) throws JMSException { + log.info("sendTopicMsg-user发送信息:{} --> {}", JSONObject.fromObject(user), this.topicUser.getTopicName()); + this.jmsMessagingTemplate.convertAndSend(this.topicUser,user); + } + + @Override + public void sendDelayTopicMsg(String msg) throws JMSException { + log.info("sendDelayTopicMsg发送延时信息:{} --> {},发送时间:{}", msg, this.delayTopicString.getTopicName(), LocalDateTime.now()); + this.jmsMessagingTemplate.getJmsTemplate().send(this.delayTopicString, session -> { + TextMessage tx = session.createTextMessage(msg); + tx.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10 * 1000); + return tx; + }); + } + + @Override + public void sendDelayTopicMsg(String msg,long time) throws JMSException { + log.info("sendDelayTopicMsg发送延时信息:{} --> {},发送时间:{}", msg, this.delayTopicString.getTopicName(), LocalDateTime.now()); + this.jmsMessagingTemplate.getJmsTemplate().send(this.delayTopicString, session -> { + TextMessage tx = session.createTextMessage(msg); + tx.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + return tx; + }); + } +} diff --git a/Springboot-ActiveMq/src/main/resources/application.yml b/Springboot-ActiveMq/src/main/resources/application.yml new file mode 100644 index 0000000000000000000000000000000000000000..cedd6ab17601fbf47a4678b9557e627a34973042 --- /dev/null +++ b/Springboot-ActiveMq/src/main/resources/application.yml @@ -0,0 +1,23 @@ +server: + servlet: + context-path: /activemq + port: 8080 + uri-encoding: utf-8 + +logging: + file: logback.xml + +spring: + activemq: + broker-url: tcp://localhost:61616 + packages: + trust-all: true # 消息传输信任所有的包,传对象必备 + in-memory: true + pool: + enabled: true + max-connections: 5 # 最大连接数为5 + jms: + pub-sub-domain: true # 默认情况下该配置是false,activemq提供的是queue模式,若要使用topic模式需要配置为true # 如过系统要两个模式兼容,则需要自己配置JMS监听工厂【看配置类】 + + + \ No newline at end of file diff --git a/Springboot-ActiveMq/src/main/resources/logback.xml b/Springboot-ActiveMq/src/main/resources/logback.xml new file mode 100644 index 0000000000000000000000000000000000000000..fbfe1efe134ee57160324b7072d96980344c9a08 --- /dev/null +++ b/Springboot-ActiveMq/src/main/resources/logback.xml @@ -0,0 +1,48 @@ + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} -%msg%n + + + + ${LOG_HOME}/springboot_activemq.log + + + ${LOG_HOME}/springboot_activemq.log.%d{yyyy-MM-dd}.%i.log + + + 100MB + + 10 + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} -%msg%n + + + + ${LOG_HOME}/springboot_activemq_error.log + + ${LOG_HOME}/springboot_activemq_error.%d{yyyy-MM-dd}.log + 10 + + + utf-8 + %X{req.remoteHost} %X{req.xForwardedFor} %d{HH:mm:ss.SSS} [%thread] %-5level %logger{135} - %msg%n + + + ERROR + ACCEPT + DENY + + + + + + + + + + \ No newline at end of file