# Spring Cloud 实战源码
**Repository Path**: bluebug2016/springcloud_actual_source_code
## Basic Information
- **Project Name**: Spring Cloud 实战源码
- **Description**: Spring Cloud 实战源码和学习笔记
- **Primary Language**: Java
- **License**: GPL-3.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 1
- **Created**: 2018-11-07
- **Last Updated**: 2024-06-18
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Spring Cloud 实战源码
#### 项目介绍
Spring Cloud 实战源码和学习笔记
#### Spring Cloud组件导图

#### Spring Boot
#### Spring Cloud Eureka
**组件功能:主要完成微服务架构中的服务治理功能。服务治理是微服务架构中最为核心和基础的模块,主要用来实现各个微服务实例的自动化注册和发现。**
- 服务注册中心 Eureka Server
1. 搭建服务注册中心步骤
1. 创建Spring Boot工程eureka-server,在pom.xml引入必要的依赖内容:
```xml
org.springframework.boot
spring-boot-starter-parent
2.0.6.RELEASE
UTF-8
UTF-8
1.8
Finchley.SR2
org.springframework.cloud
spring-cloud-starter-netflix-eureka-server
org.springframework.boot
spring-boot-starter-test
test
org.springframework.cloud
spring-cloud-dependencies
${spring-cloud.version}
pom
import
```
2. 在启动类上加上**@EnableEurekaServer**注解
```java
@EnableEurekaServer
@SpringBootApplication
public class BhMicroserviceEurekaApplication {
public static void main(String[] args) {
SpringApplication.run(BhMicroserviceEurekaApplication.class, args);
}
}
```
3. 在**application.yml**文件加入EurekaServer配置
```yaml
server:
port: 6868
eureka:
client:
register-with-eureka: false #代表不向注册中心注册自己
fetch-registry: false #不需要检索服务
service-url:
defaultZone: http://127.0.0.1:${server.port}/eureka/
```
- 服务注册与发现 Eureka Client
1. Spring boot项目pom.xml加入Spring Cloud Eureka依赖
```xml
org.springframework.boot
spring-boot-starter-parent
2.0.6.RELEASE
UTF-8
UTF-8
1.8
Finchley.SR2
org.springframework.cloud
spring-cloud-starter-netflix-eureka-server
org.springframework.boot
spring-boot-starter-test
test
org.springframework.cloud
spring-cloud-dependencies
${spring-cloud.version}
pom
import
org.springframework.boot
spring-boot-maven-plugin
```
2. 在启动类增加**@EnableDiscoveryClient**注解
```java
@EnableDiscoveryClient
@SpringBootApplication
public class BhMicroserviceOrderApplication {
//注册一个RestTemplate模板,并且使用OkHttp做为请求类库
@Bean
public RestTemplate createResttemplate(){
return new RestTemplate(new OkHttp3ClientHttpRequestFactory());
}
public static void main(String[] args) {
SpringApplication.run(BhMicroserviceOrderApplication.class, args);
}
}
```
3. 在application.yml增加相关配置
```yaml
spring:
application:
name: bh-microservice-order
eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://127.0.0.1:6868/eureka/
instance:
prefer-ip-address: true
```
4. 在服务类中使用发现服务并调用
```java
public class ItemService {
@Autowired
private RestTemplate restTemplate;
//注册发现客户端
@Autowired
private DiscoveryClient discoveryClient;
public Item queryItemById(Long id) {
//使用service name发现服务与spring.application.name名称对应
String serviceId = "bh-microservice-item";
List instances = this.discoveryClient.getInstances(serviceId);
if(instances.isEmpty()){
return null;
}
ServiceInstance serviceInstance = instances.get(0);
String url = serviceInstance.getHost()+":"+serviceInstance.getPort();
return this.restTemplate.getForObject("http://"+url+"/item/"+id,Item.class);
}
}
```
- 启用Eureka Server安全性
- Eureka Server高可用
-
#### Spring Cloud Ribbon
实现服务的负载均衡,解决多个服务如何透明调用问题
1. Ribbon有助于控制HTTP和TCP客户端的行为。
2. 步骤:
1. 引入Ribbon
```xml
org.springframework.cloud
spring-cloud-starter-ribbon
```
在Eureka Server已经引入了Ribbon了,所以不需要单儿引用
2. 在消费端注入负载均衡
```java
@Bean
@LoadBalanced
public RestTemplate createRestTemplate(){
return new RestTemplate(new OkHttp3ClientHttpRequestFactory());
}
```
@LoadBalanced注解
3. 使用
```java
public Item queryItemById(Long id) {
String serviceId = "bh-microservice-item";
return this.restTemplate.getForObject("http://"+serviceId+"/item/"+id,Item.class);
}
```
使用服务调用
4.
3.
#### Spring Cloud Hystrix
断路器:Netflix的创造了一个调用的库[Hystrix](https://github.com/Netflix/Hystrix)实现了[断路器图案](http://martinfowler.com/bliki/CircuitBreaker.html)。在微服务架构中,通常有多层服务调用。

较低级别的服务中的服务故障可能导致用户级联故障。当对特定服务的呼叫达到一定阈值时(Hystrix中的默认值为5秒内的20次故障),电路打开,不进行通话。在错误和开路的情况下,开发人员可以提供后备

使用步骤:
1. 引入Spring Hystrix
```xml
org.springframework.cloud
spring-cloud-starter-netflix-hystrix
```
2. 启用Hystrix
```java
@EnableHystrix
@EnableDiscoveryClient
@SpringBootApplication
public class BhMicroserviceOrderApplication {
@Bean
@LoadBalanced
public RestTemplate createRestTemplate(){
return new RestTemplate(new OkHttp3ClientHttpRequestFactory());
}
public static void main(String[] args) {
SpringApplication.run(BhMicroserviceOrderApplication.class, args);
}
}
```
3. 在服务调用处加入回调
使用**@HystrixCommand**增加出错时回调方法,起到断路保护的作用。
```java
@HystrixCommand(fallbackMethod = "queryItemByIdFailCallback")
public Item queryItemById(Long id) {
String serviceId = "bh-microservice-item";
return this.restTemplate.getForObject("http://" + serviceId + "/item/" + id, Item.class);
}
public Item queryItemByIdFailCallback(Long id) {
return new Item(1L,"商品信息失败",null,null,1L);
}
```
> # 1 雪崩效应
>
> 简单是来说,在分布式系统中,假如有一个请求需要调用A服务,但A服务出现了问题,则这个请求就会阻塞,那么只要调用服务A的请求都会阻塞,当阻塞的请求越来越多,占用的计算机资源就越来越多。进一步来说,就是一个服务出现问题,可能导致所有的请求都不可用,从而导致整个分布式系统都不可用,这就是“雪崩效应”。
>
>
>
>
>
> 
>
> > 如果要是考虑服务与服务之间的依赖关系,则连带作用更强,系统崩坏的速度更快!
>
> 举例来说:火车购票系统提供两种服务,查票服务和订票服务
>
> 1. 如果不考虑两个服务之间的依赖关系:查票服务出现问题,则查票的请求全部阻塞,占用大量的系统资源,久而久之,导致整个系统无法响应请求
> 2. 如果考虑依赖关系:订票服务依赖查票服务,查票服务出现问题,不仅仅查票的请求会全部阻塞,订票的服务也会阻塞,整个系统迅速崩坏!
>
> # 2 Hystrix如何去解决雪崩
>
> ## 2.1 隔离技术
>
> 例如,货船为了进行防止漏水和火灾的扩散,会将货仓分隔为多个,如下图所示:
>
> 
>
> - 线程池隔离
>
> 例如,淘宝的一个商品页面至少包含三方面信息,商品基本信息、商品价格、买家评论。一个商品页面的请求依赖于三个服务,基本信息服务A、价格服务B、评论服务C。因为是一个请求(线程)调用三个服务,调用顺序为:A->B->C,如果其中A服务出现问题,则另外两个服务都无法调用;如果B服务出现问题,则C服务无法调用。【横向调用】
>
> 
>
> 线程隔离主要解决的问题就是,A、B、C服务之间无调用顺序的限制,不论哪个服务出现问题,都不会影响其他服务的调用。但是前提是A、B、C服务之间相互独立。**基本原理是,为每个服务维护一个线程池,用户的请求将不再直接访问服务,而是通过线程池中的空闲线程来访问服务。**【纵向调用】
>
> 
>
> - 信号量隔离
>
> 该隔离技术,是限制某个服务的并发数量,对服务的并发数量设置一个阈值,超过该阈值则服务暂停接受新的请求。
>
> ## 2.2 熔断机制
>
> 如果某个目标服务调用慢或者有大量超时,如5秒内20次调用失败,此时,熔断该服务的调用,对于后续调用请求,不在继续调用目标服务,直接返回,快速释放资源。如果目标服务情况好转则恢复调用。熔断机制在Hystrix中,位于隔离技术之前。
>
> > 1、判断是否进行熔断的依据是:计算错误率,当错误率超过预设的值(默认是50%)且10秒内超过20个请求,则开启熔断。
> > 2、 对于被熔断的请求,并不是永久被切断,而是被暂停一段时间之后,允许部分请求通过,若请求都是健康的,则对请求健康恢复(取消熔断),如果不是健康的,则继续熔断。
>
> ## 2.3 总执行流程
>
> 
>
> 1. 首先判断是否有缓存,如果有则直接返回结果
> 2. 无缓存,则判断断路器是否打开,如果打开则执行回退方法,返回结果
> 3. 熔断器未打开,则判断线程池/信号量是否到底阈值,如果到达阈值,则执行回退方法,返回结果
> 4. 若未达到阈值,则执行命令,如果成功返回结果;失败返回回退方法的执行结果
#### Spring Cloud Feign
声明式服务调用
> [Feign](https://github.com/Netflix/feign)是一个声明式的Web服务客户端。这使得Web服务客户端的写入更加方便 要使用Feign创建一个界面并对其进行注释。它具有可插入注释支持,包括Feign注释和JAX-RS注释。Feign还支持可插拔编码器和解码器。Spring Cloud增加了对Spring MVC注释的支持,并使用Spring Web中默认使用的`HttpMessageConverters`。Spring Cloud集成Ribbon和Eureka以在使用Feign时提供负载均衡的http客户端。
1. 引入Feign
```xml
org.springframework.cloud
spring-cloud-starter-openfeign
```
2. 启用Feign
```java
@EnableFeignClients
@EnableHystrix
@EnableDiscoveryClient
@SpringBootApplication
public class BhMicroserviceOrderApplication {
@Bean
@LoadBalanced
public RestTemplate createRestTemplate(){
return new RestTemplate(new OkHttp3ClientHttpRequestFactory());
}
public static void main(String[] args) {
SpringApplication.run(BhMicroserviceOrderApplication.class, args);
}
}
```
3. 定义Feign接口
```java
@FeignClient("bh-microservice-item")
public interface ItemFeignClient {
@GetMapping("/item/{id}")
Item queryItemById(@PathVariable("id") Long id);
}
```
4. 注入接口改造调用
```java
@Autowired
private ItemFeignClient itemFeignClient;
@HystrixCommand(fallbackMethod = "queryItemByIdFailCallback")
public Item queryItemById(Long id) {
return this.itemFeignClient.queryItemById(id);
}
```
#### Spring Cloud Zuul
>## 1 什么是网关
>
>网关是一种可提供路由资源统一管理的工具, 将”1对N”问题 转换成了”1对1”问题。通过服务路由的功能,可以在对外提供服务时,只暴露 网关中配置的调用地址,而调用方就不需要了解后端具体的微服务主机。
>
>
>
>
>
>> 有点Facade设计模式的感觉
>
>
>
>服务网关是微服务架构中一个不可或缺的部分。通过服务网关统一向外系统提供REST API的过程中,除了具备服务路由、均衡负载功能之外,它还具备了权限控制等功能。Spring Cloud Netflix中的Zuul就担任了这样的一个角色,为微服务架构提供了前门保护的作用。
>
>Zuul通过与Eureka的整合,将自身注册到服务中心,从而可以获到所有其他微服务实例信息。
>
>**作用:**
>
>1. 服务路由
>2. 过滤器
>3. 负载均衡
>4. 权限控制
#### Spring Cloud Config

Spring Cloud Config.png
在前文[服务注册Eureka原理及集群配置](https://www.jianshu.com/p/ee14bbee732b)中,我们配置Eureka集群时,可以通过`--spring.profiles.active=peer1`来指定微服务在启动时使用哪段配置。但往往在微服务架构中,需要维护大量的配置文件,在开发、测试、生产环境,这些配置文件又有所不同,同时还希望能做到修改配置文件时,微服务能够不停止服务。
也就是说在微服务架构中,对于配置文件,通常有如下的需求
- 配置文件集中管理
- 不同环境不同配置
- 运行期间可动态调整配置
而Spring Cloud Config正是解决这问题的组件。
本文讲从基本配置步骤,动态刷新配置,以及Spring cloud config高可用三个三面讲解如何配置Spring cloud config.
##### 配置步骤
如上图所示,用户将配置文件push到git仓库,配置文件按照`{application}-{profile}.yml`或者`{application}-{profile}.properties`格式命名。spring cloud config server连接git仓库,为所有config client(具体的微服务应用)提供配置服务。
config client 在配置文件中指向配置中心的地址。
**准备配置文件**
创建一个 `spring-cloud-config`文件夹,文件夹下创建子文件夹`config`,在子文件夹下创建以下四个文件,并上传本地gitlab仓库中`http://gitlab.bill.com/billjiang/spring-cloud-config`
- hello.yml
- hello-dev.yml
- hello-test.yml
- hello-production.yml
**创建config server**
分别对应默认、开发阶段、测试阶段、生成环境的配置文件,并在里面写入相应的内容,比如依次写入`profile:defualt-1.0`、`profile:dev-1.0`、`profile:test-1.0`、`profile:production-1.0`内容,以便后续测试。
下面configserver项目为例,演示spring cloud config的配置,本文示例代码参考[springcloud-demo](https://link.jianshu.com?t=https://github.com/bill1012/microservice/tree/master/springcloud-demo/common)下的`configserver`项目
- 创建configserver项目,引入如下依赖
```
org.springframework.cloud
spring-cloud-config-server
org.springframework.boot
spring-boot-starter-security
```
上面的pom.xml不仅引入了config-server还引入了security,主要是配置文件的信息比较重要,加上安全认证多个保障。
- 启动类上加上`@EnableConfigServer`和`@EnableDiscoveryClient`注解
这个注解声明是config server,第二个注解是将其注册到注册中心,方便管理和横向扩展为集群。
- 在配置文件中,加上如下配置
```
spring:
cloud:
config:
server:
git:
uri: http://gitlab.bill.com/billjiang/spring-cloud-config.git
search-paths: config
username: billjiang
password: '{cipher}2c50a112807ec405695dac19c15cc6da280d8d70e9998b82a9f11d202a6fb7b4'
health:
repositories:
a-hello:
label: master
name: hello
profiles: dev
security:
basic:
enabled: true
user:
name: user
password: 123456
encrypt:
key: billjiang
```
这里对配置文件进行了对称加密,也可以对放入git仓库(我在本地搭建了个gitlab)的敏感信息进行加密,对于密文的生成可以在项目启动后,使用`curl http://user:123456@localhost:8090/encrypt -d [明文]` 这样可以在控制台生成密文,比如`2c50a112807ec405695dac19c15cc6da280d8d70e9998b82a9f11d202a6fb7b4`,把生成的密文加上`{cipher}`前缀即可作为对称加密的信息。其中`encrypt.key`是对称加密的密钥。
**需要注意的是:**如果使用`Dalston.SR2`版本的Spring.cloud 在使用`curl http://user:123456@localhost:8090/encrypt -d [明文]`会报错,把Spring cloud换成`Dalston.SR1`就能解决,这是`Dalston.SR2`的一个bug。
启动项目后,使用`http://localhost:8090/hello/dev`可返回如下信息,说明config server已经从git仓库中读取了`hello-dev.yml`的配置信息

启动spring cloud config.png
在浏览器输入`http://localhost:8090/hello-dev.yml`还可以在界面输入配置文件的具体内容。
**创建config client**
下面以`hello`项目改造为例,说下client端如何从config server中获取配置。
- 添加依赖
```
org.springframework.cloud
spring-cloud-starter-config
```
- 在配置文件`bootstrap.yml`中添加如下配置
```
spring:
application:
name: hello
cloud:
config:
uri: http://user:123456@localhost:8090/
profile: dev
label: master
```
之所以不在application.yml中配置,spring boot会优先加载bootstrap.yml,不然上下文中无法读取到配置而报错。这里指向了config server的地址,并且说明读取`hello-dev.yml`的配置,读取的是master分支。
- 写一个测试的列子
```
@Value("${profile}")
private String profile;
@GetMapping("/profile")
public String profile(){
return this.profile;
}
```
- 启动`hello`项目,输入`http://localhost:8000/profile`即可在界面上看到`dev-1.0`的输出。说明客户端正常读取了指定的配置文件内容。
# 配置刷新
要在微服务运行期间动态刷新配置,可以通过调用`/refresh`实现,但这样只能针对单个服务,而且要手动操作;如果通过消息中间件,可以将刷新事件广播到所有相关的微服务,从而做到自动刷新。
##### 调用/refresh 刷新
- 引入依赖actuator
```
org.springframework.boot
spring-boot-starter-actuator
1.5.6.RELEASE
`
```
- 配置文件中增加
```
endpoints:
refresh:
enabled: true
sensitive: false
```
- 修改配置文件`hello-dev.yml`内容,改为`dev-1.0-refresh`,提交到gitlab中
- 执行`curl -X POST http://localhost:8000/refresh` 或者用webhook提交该请求
- 浏览器输入`http://localhost:8000/profile`查看输出,发现内容已经更改为`dev-1.0-refresh`
##### 使用Spring Cloud Bus自动刷新
借助Spring Cloud Bus,可以将配置变更事件广播到相关微服务,从而使得相关微服务能够自动刷新配置。
那么刷新的事件可以从单个的服务发起,这样可以将配置更新事件广播到同类服务集群,如果N个微服务集群要更新服务,那么也要操作N次。而从config server发起刷新,则所有相关集群自动刷新配置,后一种明显更好。
**config-server 配置spring-cloud-bus**
- 引入 spring-cloud-bus
```
org.springframework.cloud
spring-cloud-starter-bus-amqp
```
- 配置文件bootstrap.yml新增
```
endpoints:
bus:
enabled: true
sensitive: false
spring:
rabbitmq:
host: localhost
port: 5673
username: guest
password: guest
```
- 启动项目`configserver`
- 修改配置文件`hello-dev.yml`内容为`dev-1.0-bus`并提交到gitlab
- 执行`curl -X POST http://user:123456@localhost:8090/bus/refresh`刷新配置
- 浏览器输入`http://localhost:8000/profile`
发现内容变为`dev-1.0-bus`,说明自动化刷新已生效。
为了更好地测试上述效果,可以通过`java -ar hello.jar --server.port=8001`,`java -jar hello.jar --server.port=8002`命令将单个微服务配置成集群,同时配置其他集群。变更多个配置文件后,刷新配置,看看是否都更新了。
大家可以按照以上步骤,在单个服务节点上配置spring cloud bus,看看刷新配置后,集群中其他节点的配置是否同步更新了。
##### 局部刷新
可以通过`/bus/refresh?destination=customers:8000`,`customers:8000`是注册在Eureka Server上的微服务ID,即ApplicationContextID.
##### Spring cloud config高可用
作为微服务架构中一个十分重要的服务节点,spring cloud config要保证高可用,就要避免单点。这里要分两种情况,如果cloud server已经注册到Eureka Server,则仅仅需要多部署几个cloud server形成集群。如果没有注册到Eureka Server,就需要通过负载均衡器,讲请求转发到cloud server集群。
当然,也要保证消息中间件RabbitMQ的高可用。
以上就是Spring Cloud Config的配置。
作者:billJiang
链接:https://www.jianshu.com/p/e48de30aab76
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
#### Spring Cloud Bus
##### 前言
在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都能连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线。在总线上的各个实例都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息,例如配置信息的变更或者其他一些管理操作等。
由于消息总线在微服务架构系统的广泛使用,所以它同配置中心一样,几乎是微服务架构中的必备组件。`spring cloud`作为微服务架构综合性的解决方案,对此自然也有自己的实现,这就是`spring cloud bus`。通过`spring cloud bus`,可以非常容易的搭建起消息总线,同时实现了一些消息总线中的常用功能,比如配合`spring cloud config`实现微服务应用配置信息的动态更新等。
##### 消息代理
消息代理(message broker)是一种消息验证,传输,路由的架构模式。它在应用程序之间起到通信并最小化应用之间的依赖的作用,使得应用程序可以高效地解耦通信过程。消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。它包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作,下面这些是企业应用中,我们经常使用消息代理的场景:
- 将消息路由到一个或多个目的地。
- 消息转化为其他的表现方式。
- 执行消息的聚集,消息的分解,并将结果发送到它们的目的地,然后重新组合响应返回给消息用户。
- 调用web服务来检索数据。
- 响应事件或错误。
- 使用发布-订阅模式来提供内容和基于主题的消息路由。
目前已经有非常多的开源产品可以供大家使用,比如:
- activemq
- kafka
- rabbitmq
- rocketmq
- ...
当前版本的`spring cloud bus`仅支持两款中间件产品:`rabbitmq`和`kafka`。
##### rabbitmq实现消息总线
`rabbitmq`是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件。Rabbitmq服务是高性能,可伸缩性而闻名的Erlang语言编写而成的,其集群和故障转移是构建在开放电信平台框架的。
AMQP是`Advanced Message Queuing Protocol`的简称,它是一个面向消息中间件的开发式标准应用层协议,它定义了以下这些特性:
- 消息方向
- 消息队列
- 消息路由(包括点到点和发布-订阅模式)
- 可靠性
- 安全性
AMQP要求消息的提供者和客户端接收者的行为要实现对不同的供应商可以用相同的方式(比如SMTP,HTTP,FTP等)进行互相操作。在以往的中间件标准中,主要还是建立在api级别的,比如jms,集中于通过不同的中间件实现来建立标准化的程序间的互操作性,而不是在多个中间件产品间实现互操作性。
AMQP与JMS不同,JMS定义了一个API和一组消息收发必须要实现的行为,而AMQP是一个线路级协议。线路级协议描述的是通过网络发送的数据传输格式。因此,任何符合该数据格式的消息发送和接收工具都能互相兼容和进行操作,这样就能轻易实现跨技术平台的架构方案。
RabbitMQ以AMQP协议实现,所以它可以支持多种操作系统,多种编程语言,几乎可以覆盖所有主流的企业级技术平台。在微服务架构消息中间件的选型中,它是一个非常适合且优秀的选择。因此,在`spring cloud bus`中包含了对`rabbit`的自动化默认配置。
##### 基本概念
介绍一些Rabbitmq的基本概念,
- Broker:可以理解成消息队列服务器的实体,它是一个中间件应用,负责接收消息生产者的消息,然后将消息发送到消息接收者或者其他的Broker。
- Exchange:消息交换机,是消息第一个到达的地方,消息通过它指定的路由规则,分发到不同的消息队列中去。
- Queue:消息队列,消息通过发发送和路由之后最终到达的地方,到达Queue的消息即进入逻辑上等待消费的状态。每个消息都会被发送到一个或多个队列。
- Binding:绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来,也就是Exchange和Queue之间的虚拟连接。
- Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。
- Virtual host:虚拟主机,它是对Broker的虚拟划分,将消费者,生产者和它们的依赖的AMQP相关结构进行隔离,一般都是为了安全考虑。比如,我们可以在一个Broker中设置多个虚拟主机,对不同用户进行权限的分离。
- Connection:连接,代表生产者,消费者,Broker之间进行通信的物理网络。
- Channel:消息通道,用于连接生产者和消费者的逻辑结构。在客户端的每个连接里,可建立多个Channel,每个Channel代表一个会话任务,通过Channel可以隔离同一个连接中的不同交互内容。
- Producer:消息生产者,制造消息并发送消息的程序。
- Consumer:消息消费者,接收消息并处理消息的程序。
消息投递到队列的整个过程大致如下:
1.客户端连接到消息队列服务器,打开一个Channel。
2.客户端声明一个Exchange,并设置相关属性。
3.客户端声明一个Queue,并设置相关属性。
4.客户端使用Routing Key,在Exchange和Queue之间建立好绑定关系。
5.客户端投递消息到Exchange。
1. Exchange接收到消息后,根据消息的key和已经设置的Binding,进行消息路由,将消息投递到一个或多个Queue里。
Exchange也有几种类型。
1.Direct交互机:完全根据Key进行投递。比如,绑定时设置了Routing Key为abc,那么客户端提交的消息,只有设置了key为Routing Key的才会被投递到队列。
2.Topic交互机:对Key进行模式匹配后进行投递,可以使用符号#匹配一个或多个词,符号*匹配正好一个词。比如,abc.#匹配abc.def.ghi, abc.*只匹配abc.def.
3.Fanout交互机:不需要任何Key,它采用广播的模式,一个消息进来时,投递到与该交互机绑定的所有队列。
Rabbitmq支持消息持久化,也就是将数据写在磁盘上。为了数据安全考虑,大多数情况下都会选择持久化。消息队列持久化包括三个部分:
1. Exchange持久化,在声明时指定durable >=1.
2. Queue持久化,在声明时指定durable => 1.
3. 消息持久化,在投递时指定delivery_mode => 2(1是非持久化)。
如果Exchange和Queue都是持久化,那么它们之间的Binding也是持久化的。如果Exchange和Queue两者之间有一个是持久化的,一个是非持久化的,就不允许建立绑定。
##### 安装
##### 快速入门
在`springboot`中整合`Rabbitmq`是一个非常容易的事情,
- 新建一个`spring boot`工程,命名为`springboot-rabbitmq`
- 在pom文件中引入依赖,其中
```
org.springframework.boot
spring-boot-starter-parent
1.4.5.RELEASE
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-test
```
- 在`application.properties`中配置关于`Rabbitmq`的连接和用户信息,
```
spring:
application:
name: springboot-rabbitmq
rabbitmq:
host:
port: 5672
username:
password:
```
- 创建生产者`Sender`。通过注入`AmqpTemplate`接口的实例来实现消息的发送,`AmqpTemplate`接口定义了一套针对`AMQP`协议的基础操作,在`spring boot`中会根据配置来注入其具体的实现。
我们发送一字符串到`zhihao.miao.order`队列中,
```
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "hello "+ LocalDateTime.now().toString();
System.out.println("Sender: "+context);
this.amqpTemplate.convertAndSend("zhihao.miao.order",context);
}
}
```
- 创建消息消费者`Receiver`。通过`@RabbitListener`注解定义该类对指定队列的监听,并用`@RabbitHandler`注解来指定对消息的处理方法(不同的消息格式,`@RabbitHandler`配置的方法的入参就不用,默认是byte[] 类型)。所以,该消费者实现了对`zhihao.miao.order`队列的消费,消费操作作为输出消息的字符串内容。
```
@Component
@RabbitListener(queues = "zhihao.miao.order")
public class Receiver {
@RabbitHandler
public void process(String hello){
System.out.println("Receiver: "+hello);
}
}
```
- 创建`RabbitMQ`的配置类`RabbitConfig`,用来配置队列,交换机,路由等高级信息。这里我们只配置队列,已完成一个基本的生产消费过程。
这一步相当于自动创建的过程,如果在控制台上已经创建了该队列,此步骤可以省略。
```
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue queue(){
return new Queue("zhihao.miao.order");
}
}
```
- 创建启动主类
```
@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class,args);
}
}
```
- 创建单元测试类,用来调用消息生产
```
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = RabbitMQApplication.class)
public class RabbitMQApplicationTest {
@Autowired
private Sender sender;
@Test
public void setSender() throws Exception{
sender.send();
}
}
```
- 启动应用主类,在控制台看到创建了一个连接rabbitmq的连接
作者:二月_春风
链接:https://www.jianshu.com/p/a5012222d01e
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
#### Spring Cloud Stream
#### Spring Cloud Sleuth