# E-commerce-cloud **Repository Path**: HouByte/e-commerce-cloud ## Basic Information - **Project Name**: E-commerce-cloud - **Description**: 基于Spring Cloud Alibaba 的微服务电商平台 - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 3 - **Forks**: 0 - **Created**: 2022-03-02 - **Last Updated**: 2023-06-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # E-commerce-cloud [CSDN博客地址文章](https://blog.csdn.net/Vincent_Vic_/article/details/123221364) @[TOC](Spring Cloud Alibaba) # 一、简介 ## 1.1 官方描述 > [Spring Cloud Alibaba](https://spring.io/projects/spring-cloud-alibaba) provides a one-stop solution for distributed application development. It contains all the components required to develop distributed applications, making it easy for you to develop your applications using Spring Cloud. > With [Spring Cloud Alibaba](https://spring.io/projects/spring-cloud-alibaba), you only need to add some annotations and a small amount of configurations to connect Spring Cloud applications to the distributed solutions of Alibaba, and build a distributed application system with Alibaba middleware. ## 1.2 架构图  > 图片来源:[processon](https://www.processon.com/view/5fe94f3ce0b34d2934ee23d9?fromnew=1) ## 1.3 模块  # 二、项目搭建 ## 2.1 maven 创建maven项目,配置pom ```xml 4.0.0 org.springframework.boot spring-boot-starter-parent 2.4.2 cn.flowboot.e.commerce E-commerce-cloud 1.0-SNAPSHOT pom 1.0-SNAPSHOT Hoxton.SR12 2.2.7.RELEASE 1.16.18 3.11 4.4 5.6.0 0.11.2 1.2.78 3.4.2 1.2.6 1.21 3.0.0 2.3.2 2.1.4 1.3.1 5.8.0 5.8.0 2.11.0 1.4 3.2.2 4.1.2 1.7 org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import com.alibaba.cloud spring-cloud-alibaba-dependencies ${spring-cloud-alibaba.version} pom import org.projectlombok lombok ${lombok.version} org.apache.commons commons-lang3 ${commons-lang3.version} org.apache.commons commons-collections4 ${commons-collections4.version} commons-collections commons-collections ${commons.collections.version} cn.hutool hutool-all ${hutool-all.version} io.jsonwebtoken jjwt-api ${jwt.version} io.jsonwebtoken jjwt-impl ${jwt.version} io.jsonwebtoken jjwt-jackson ${jwt.version} com.alibaba fastjson ${fastjson.version} com.baomidou mybatis-plus-boot-starter ${mybatis-plus.version} com.alibaba druid-spring-boot-starter ${druid.version} eu.bitwalker UserAgentUtils ${bitwalker.version} com.github.pagehelper pagehelper-spring-boot-starter ${pagehelper.boot.version} com.github.oshi oshi-core ${oshi.version} net.java.dev.jna jna ${jna.version} net.java.dev.jna jna-platform ${jna.version} io.springfox springfox-boot-starter ${swagger.version} io.swagger swagger-models commons-fileupload commons-fileupload ${commons.fileupload.version} org.apache.poi poi-ooxml ${poi.version} com.github.penggle kaptcha ${kaptcha.version} spring-milestones Spring Milestones https://repo.spring.io/milestone false ``` ## 2.2 创建基础模块及基础功能 ### 2.2.1 创建子模块 创建子模块e-commerce-common和e-commerce-mvc-config **e-commerce-common/pom.xml** ```xml E-commerce-cloud cn.flowboot.e.commerce 1.0-SNAPSHOT 4.0.0 e-commerce-common jar 8 8 org.projectlombok lombok ``` **e-commerce-mvc-config/pom.xml** ```xml E-commerce-cloud cn.flowboot.e.commerce 1.0-SNAPSHOT 4.0.0 e-commerce-mvc-config 8 8 cn.flowboot.e.commerce e-commerce-common org.springframework.boot spring-boot-starter-web ``` 在主pom.xml添加模块管理 ```xml cn.flowboot.e.commerce e-commerce-common ${E-commerce-cloud} cn.flowboot.e.commerce e-commerce-mvc-config ${E-commerce-cloud} ... ``` ### 2.2.2 基础项目代码准备 e-commerce-common/.../CommonResponse ```java package cn.flowboot.e.commerce.vo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** *
* This should do work that does not require network transport to produce. *
* In other words, this should be a static or cached result that can immediately be returned upon failure. *
* If network traffic is wanted for fallback (such as going to MemCache) then the fallback implementation should invoke another {@link HystrixCommand} instance that protects against that network * access and possibly has another level of fallback that does not involve network access. *
* DEFAULT BEHAVIOR: It throws UnsupportedOperationException. * * @return R or throw UnsupportedOperationException if not implemented */ @Override protected List> getFallback() { log.warn("trigger hystrix fallback: [{}]", Thread.currentThread().getName()); return Collections.emptyList(); } } ``` 测试,有四种调用方式 ```java /** * executeList - 测试编程方式服务熔断 * version: 1.0 - 2022/3/6 * @return {@link List< Map< String, Object>> } */ @GetMapping("/execute/list") public List> executeList(){ log.info("Hystrix Command test"); return orderHystrixCommand.execute(); } /** * queueList - 测试编程方式服务熔断 - 采用异步 * version: 1.0 - 2022/3/6 * @param * @return {@link List< Map< String, Object>> } */ @GetMapping("/queue/list") public List> queueList() throws ExecutionException, InterruptedException { log.info("queue Hystrix Command test"); Future>> future = orderHystrixCommand.queue(); //异步执行,这里可以进行其他操作 return future.get(); } /** * observeList - 测试编程方式服务熔断 - 热响应调用 * version: 1.0 - 2022/3/6 * @param * @return {@link List< Map< String, Object>> } */ @GetMapping("/observe/list") public List> observeList() throws ExecutionException, InterruptedException { log.info("observe Hystrix Command test"); Observable>> observe = orderHystrixCommand.observe(); //异步执行,这里可以进行其他操作 return observe.toBlocking().single(); } /** * toObservableList - 测试编程方式服务熔断 - 冷响应调用 * version: 1.0 - 2022/3/6 * @param * @return {@link List< Map< String, Object>> } */ @GetMapping("/observe/list") public List> toObservableList() throws ExecutionException, InterruptedException { log.info("toObservable Hystrix Command test"); Observable>> toObservable = orderHystrixCommand.toObservable(); //异步执行,这里可以进行其他操作 return toObservable.toBlocking().single(); } ``` 信号量方式处理 ```java package cn.flowboot.e.commerce.hystrix; import cn.flowboot.e.commerce.service.OrderService; import com.netflix.hystrix.*; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import rx.Observable; import java.util.Collections; import java.util.List; import java.util.Map; /** * OrderService 实现包装 信号量处理 * Hystrix舱壁模式: * 1。线程池 * 2.信号量 : 有限信号机 * @version 1.0 * @author: Vincent Vic * @since: 2022/03/06 */ @Slf4j @Service public class OrderHystrixObservableCommand extends HystrixObservableCommand>> { private final OrderService orderService; protected OrderHystrixObservableCommand(OrderService orderService) { super(HystrixObservableCommand.Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("OrderService")) .andCommandKey(HystrixCommandKey.Factory.asKey("OrderHystrixObservableCommand")) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withFallbackEnabled(true) .withCircuitBreakerEnabled(true) ) ); this.orderService = orderService; } /** * Implement this method with code to be executed when {@link #observe()} or {@link #toObservable()} are invoked. * * @return R response type */ @Override protected Observable>> construct() { log.info("Hystrix Observable Command construct to Get Order List: [{}]", Thread.currentThread().getName()); return Observable.create(subscriber->{ //Observable有三个关键的事件方法,分别是onNext、onCompleted、onError try { if (!subscriber.isUnsubscribed()){ log.info("subscriber command task : [{}]",Thread.currentThread().getName()); //这里可以批量操作 //仅为演示 // for (int i = 0; i < 10; i++) { // subscriber.onNext(orderService.list()); // } subscriber.onNext(orderService.list()); subscriber.onCompleted(); } } catch (Exception exception){ subscriber.onError(exception); } }); } /** * resumeWithFallback - 服务熔断后处理 * version: 1.0 - 2022/3/6 * @param * @return {@link Observable< List< Map< String, Object>>> } */ @Override protected Observable>> resumeWithFallback() { log.warn("trigger hystrix fallback: [{}]", Thread.currentThread().getName()); return Observable.create(subscriber->{ //Observable有三个关键的事件方法,分别是onNext、onCompleted、onError try { if (!subscriber.isUnsubscribed()){ log.info("[Fallback]subscriber command task : [{}]",Thread.currentThread().getName()); subscriber.onNext(Collections.emptyList()); subscriber.onCompleted(); } } catch (Exception exception){ subscriber.onError(exception); } }); } } ``` 测试接口 ```java /** * executeObsList - 测试编程方式服务熔断 信号量异步执行 * version: 1.0 - 2022/3/6 * @return {@link List< Map< String, Object>> } */ @GetMapping("/execute/obs/list") public List> executeObsList(){ log.info("Hystrix Observe Command test"); List> results = new ArrayList<>(); //异步执行 Observable>> observe = orderHystrixObservableCommand.observe(); observe.subscribe(new Observer>>() { @Override public void onCompleted() { log.info("subscriber onCompleted : [{}]",Thread.currentThread().getName()); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(List> list) { results.addAll(list); } }); log.info("observe command results : [{}],[{}]",JSON.toJSONString(results),Thread.currentThread().getName()); return results; } /** * executeToObsList - 测试编程方式服务熔断 * version: 1.0 - 2022/3/6 * @return {@link List< Map< String, Object>> } */ @GetMapping("/execute/to/obs/list") public List> executeToObsList(){ log.info("Hystrix toObservable Command test"); return orderHystrixObservableCommand.toObservable().toBlocking().single(); } ``` ## 8.3 Hystrix 请求缓存 实现过滤器 ```java package cn.flowboot.e.commerce.filter; import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault; import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier; import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook; import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.servlet.*; import javax.servlet.annotation.WebFilter; import java.io.File; import java.io.FileFilter; import java.io.IOException; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/06 */ @Slf4j @Component @WebFilter( filterName = "HystrixRequestContextServletFilter", urlPatterns = "/*", asyncSupported = true ) public class HystrixRequestContextServletFilter implements Filter { @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { // 初始化 Hystrix 请求上下文 // 在不同的 context 中缓存是不共享的 // 这个初始化是必须的 HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { // 配置 hystrixConcurrencyStrategyConfig(); // 请求正常通过 chain.doFilter(request, response); } finally { // 关闭 Hystrix 请求上下文 context.shutdown(); } } /** * 配置 Hystrix 的并发策略 * */ public void hystrixConcurrencyStrategyConfig() { try { HystrixConcurrencyStrategy target = HystrixConcurrencyStrategyDefault.getInstance(); HystrixConcurrencyStrategy strategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); if (strategy instanceof HystrixConcurrencyStrategyDefault) { // 如果已经就是我们想要配置的 return; } // 将原来其他的配置保存下来 HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook(); HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher(); HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy(); // 先重置, 再把我们自定义的配置与原来的配置写回去 HystrixPlugins.reset(); HystrixPlugins.getInstance().registerConcurrencyStrategy(target); HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook); HystrixPlugins.getInstance().registerEventNotifier(eventNotifier); HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher); HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy); log.info("config hystrix concurrency strategy success"); } catch (Exception ex) { log.error("Failed to register Hystrix Concurrency Strategy: [{}]", ex.getMessage(), ex); } } } ``` ### 8.3.1 编程的方式 ```java package cn.flowboot.e.commerce.hystrix; import cn.flowboot.e.commerce.service.OrderService; import com.google.common.collect.Maps; import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixRequestCache; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.List; import java.util.Map; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/06 */ @Slf4j public class CacheHystrixCommand extends HystrixCommand> { private final OrderService orderService; private static final HystrixCommandKey CACHE_KEY = HystrixCommandKey.Factory.asKey("CacheHystrixCommand"); private final String orderNo; public CacheHystrixCommand(OrderService orderService,String orderNo) { super( HystrixCommand.Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("CacheHystrixCommandGroup")) .andCommandKey(CACHE_KEY) ); this.orderService = orderService; this.orderNo = orderNo; } /** * Implement this method with code to be executed when {@link #execute()} or {@link #queue()} are invoked. * * @return R response type * @throws Exception if command execution fails */ @Override protected Map run() throws Exception { log.info("CacheHystrixCommand In Hystrix Command to get service instance:[{}], [{}]",orderNo,Thread.currentThread().getName()); return orderService.desc(orderNo); } @Override protected Map getFallback() { return Maps.newConcurrentMap(); } @Override protected String getCacheKey() { return orderNo; } /** * flushRequestCache - 根据缓存key清理在一次Hystrix请求上下文中的缓存 * version: 1.0 - 2022/3/6 * @param orderNo 订单号 */ public static void flushRequestCache(String orderNo){ HystrixRequestCache.getInstance(CACHE_KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(orderNo); log.info("flush request cacheIin hystrix command:[{}],[{}]",orderNo,Thread.currentThread().getName()); } } ``` 测试 ```java @Autowired private OrderService orderService; /** * queueList - 测试编程方式服务熔断 - 缓存测试 * version: 1.0 - 2022/3/6 * @param * @return {@link List< Map< String, Object>> } */ @GetMapping("/cache/desc") public CommonResponse cacheHystrixCommand(@RequestParam(defaultValue = "123456") String orderNo) throws ExecutionException, InterruptedException { log.info("cache Hystrix Command test"); CacheHystrixCommand command1 = new CacheHystrixCommand(orderService,orderNo); CacheHystrixCommand command2 = new CacheHystrixCommand(orderService,orderNo); Map result01 = command1.execute(); Map result02 = command2.execute(); log.info("result01 [{}] result02 [{}]",JSON.toJSONString(result01),JSON.toJSONString(result02)); //清除缓存 CacheHystrixCommand.flushRequestCache(orderNo); //继续发起 CacheHystrixCommand command3 = new CacheHystrixCommand(orderService,orderNo); CacheHystrixCommand command4 = new CacheHystrixCommand(orderService,orderNo); Map result03 = command3.execute(); Map result04 = command4.execute(); log.info("result03 [{}] result04 [{}]",JSON.toJSONString(result03),JSON.toJSONString(result04)); return CommonResponse.success(); } ``` ### 8.3.2 注解的方式 |注解 | 描述 | 属性| |--|--|--| @CacheResult | 该注解用来标记请求命令返回的结果应该被缓存,它必须与@HystrixCommand注解结合使用|cacheKeyMethod @CacheRemove | 该注解用来让请求命令的缓存失效,失效的缓存根据commandKey进行查找。|commandKey,cacheKeyMethod @CacheKey | 该注解用来在请求命令的参数上标记,使其作为cacheKey,如果没有使用此注解则会使用所有参数列表中的参数作为cacheKey |value ```java package cn.flowboot.e.commerce.hystrix; import cn.flowboot.e.commerce.service.OrderService; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import com.netflix.hystrix.contrib.javanica.cache.annotation.CacheKey; import com.netflix.hystrix.contrib.javanica.cache.annotation.CacheRemove; import com.netflix.hystrix.contrib.javanica.cache.annotation.CacheResult; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/06 */ @Slf4j @RequiredArgsConstructor @Service public class CacheHystrixCommandAnnotation { private final OrderService orderService; // 第一种 Hystrix Cache 注解的使用方法 @CacheResult(cacheKeyMethod = "getCacheKey") @HystrixCommand(commandKey = "CacheHystrixCommandAnnotation") public Map useCacheByAnnotation01(String orderNo){ log.info("use cache01 to get order client info:[{}]",orderNo); return orderService.desc(orderNo); } @CacheRemove(commandKey = "CacheHystrixCommandAnnotation", cacheKeyMethod = "getCacheKey") @HystrixCommand public void flushCacheByAnnotation01(String cacheId) { log.info("flush hystrix cache key: [{}]", cacheId); } public String getCacheKey(String cacheId) { return cacheId; } // 第二种 Hystrix Cache 注解的使用方法 @CacheResult @HystrixCommand(commandKey = "CacheHystrixCommandAnnotation") public Map useCacheByAnnotation02(@CacheKey String orderNo) { log.info("use cache02 to get order client info: [{}]", orderNo); return orderService.desc(orderNo); } @CacheRemove(commandKey = "CacheHystrixCommandAnnotation") @HystrixCommand public void flushCacheByAnnotation02(@CacheKey String cacheId) { log.info("flush hystrix cache key: [{}]", cacheId); } // 第三种 Hystrix Cache 注解的使用方法 @CacheResult @HystrixCommand(commandKey = "CacheHystrixCommandAnnotation") public Map useCacheByAnnotation03(String orderNo) { log.info("use cache03 to get order info: [{}]", orderNo); return orderService.desc(orderNo); } @CacheRemove(commandKey = "CacheHystrixCommandAnnotation") @HystrixCommand public void flushCacheByAnnotation03(String cacheId) { log.info("flush hystrix cache key: [{}]", cacheId); } } ``` 测试接口 ```java /** * queueList - 测试编程方式服务熔断 - 缓存注解形式测试 * version: 1.0 - 2022/3/6 * @param * @return {@link List< Map< String, Object>> } */ @GetMapping("/cache/desc/annotation") public CommonResponse cacheHystrixCommandByAnnotation(@RequestParam(defaultValue = "123456") String orderNo) throws ExecutionException, InterruptedException { log.info("cache Annotation Hystrix Command test"); Map result01 = cacheHystrixCommandAnnotation.useCacheByAnnotation01(orderNo); Map result02 = cacheHystrixCommandAnnotation.useCacheByAnnotation01(orderNo); log.info("result01 [{}] result02 [{}]",JSON.toJSONString(result01),JSON.toJSONString(result02)); //清除缓存 cacheHystrixCommandAnnotation.flushCacheByAnnotation01(orderNo); //继续发起 Map result03 = cacheHystrixCommandAnnotation.useCacheByAnnotation02(orderNo); Map result04 = cacheHystrixCommandAnnotation.useCacheByAnnotation02(orderNo); log.info("result03 [{}] result04 [{}]",JSON.toJSONString(result03),JSON.toJSONString(result04)); //清除缓存 cacheHystrixCommandAnnotation.flushCacheByAnnotation02(orderNo); //继续发起 Map result05 = cacheHystrixCommandAnnotation.useCacheByAnnotation03(orderNo); Map result06 = cacheHystrixCommandAnnotation.useCacheByAnnotation03(orderNo); log.info("result05 [{}] result06 [{}]",JSON.toJSONString(result05),JSON.toJSONString(result06)); //清除缓存 cacheHystrixCommandAnnotation.flushCacheByAnnotation03(orderNo); return CommonResponse.success(); } ``` ## 8.4 请求合并 ### 8.4.1 简介 - 默认情况下,每一个请求都会占用一个线程和一次网络请求,高并发场景下效率不高 ```mermaid graph LR A1[Service A1] --> B[线程池] A2[Service A2 ] --> B A3[Service A3] --> B B --X3--> C(Service B Hystrix) ``` - 使用Hystrix的请求合并,将多个请求merge为一个,提高服务的并发能力 ```mermaid graph LR A1[Service A1] --> B[请求合并] A2[Service A2 ] --> B A3[Service A3] --> B B --X1--> C(Service B Hystrix) ``` **请求合并的适用场景与注意事项** - 适用场景︰单个对象的查询并发数很高,服务提供方负载较高,就可以考虑使用请求合并 - 注意事项 1. 请求在代码中人为的设置了延迟时间,会降低请求的响应速度 2. 可能会提高服务提供方的负载,因为返回List结果数据量偏大 3. 实现请求合并比较复杂 ### 8.4.2编程的方式 批量请求操作 ```java package cn.flowboot.e.commerce.hystrix.request.merge; import cn.flowboot.e.commerce.dto.Good; import cn.flowboot.e.commerce.service.OrderService; import com.alibaba.fastjson.JSON; import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.List; /** * 批量请求 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/07 */ @Slf4j public class OrderBatchCommand extends HystrixCommand>> { private final OrderService orderService; private final List orderNos; public OrderBatchCommand(OrderService orderService, List orderNos) { super( HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey("OrderBatchCommand") ) ); this.orderService = orderService; this.orderNos = orderNos; } @Override protected List> run() throws Exception { log.info("user batch command to get result {}", JSON.toJSONString(orderNos)); return orderService.descs(orderNos); } @Override protected List> getFallback() { log.warn("user batch command to get result failure"); return Collections.emptyList(); } } ``` 请求合并器 ```java package cn.flowboot.e.commerce.hystrix.request.merge; import cn.flowboot.e.commerce.dto.Good; import cn.flowboot.e.commerce.service.OrderService; import com.netflix.hystrix.HystrixCollapser; import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCollapserProperties; import com.netflix.hystrix.HystrixCommand; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; /** * 请求合并器 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/07 */ public class OrderCollapseCommand extends HystrixCollapser>,List,String> { private final OrderService orderService; private final String orderNo; public OrderCollapseCommand(OrderService orderService, String orderNo) { super( HystrixCollapser.Setter.withCollapserKey( HystrixCollapserKey.Factory.asKey("OrderCollapseCommand") ).andCollapserPropertiesDefaults( HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(300) ) ); this.orderService = orderService; this.orderNo = orderNo; } /** * getRequestArgument - 获取请求中的参数 * version: 1.0 - 2022/3/7 * @param * @return {@link String } */ @Override public String getRequestArgument() { return this.orderNo; } /** * createCommand - 创建批量请求 * version: 1.0 - 2022/3/7 * @param collapsedRequests * @return {@link HystrixCommand< List< List< Good>>> } */ @Override protected HystrixCommand>> createCommand(Collection, String>> collapsedRequests) { List orderNos = new ArrayList<>(collapsedRequests.size()); orderNos.addAll( collapsedRequests.stream() .map(CollapsedRequest::getArgument) .collect(Collectors.toList()) ); return new OrderBatchCommand(orderService,orderNos); } /** * mapResponseToRequests - 响应分发 * version: 1.0 - 2022/3/7 * @param batchResponse * @param collapsedRequests */ @Override protected void mapResponseToRequests(List> batchResponse, Collection, String>> collapsedRequests) { int count = 0; for (CollapsedRequest, String> collapsedRequest : collapsedRequests) { //从批量响应集合中按顺序取出结果 List goods = batchResponse.get(count++); //将结果返回原 Response中 collapsedRequest.setResponse(goods); } } } ``` 测试合并功能接口 ```java @GetMapping("/merge/good/desc") public CommonResponse descsMerge() throws Exception { OrderCollapseCommand collapseCommand01 = new OrderCollapseCommand(orderService,"1"); OrderCollapseCommand collapseCommand02 = new OrderCollapseCommand(orderService,"2"); OrderCollapseCommand collapseCommand03 = new OrderCollapseCommand(orderService,"3"); Future> queue01 = collapseCommand01.queue(); Future> queue02 = collapseCommand02.queue(); Future> queue03 = collapseCommand03.queue(); List goods01 = queue01.get(); List goods02 = queue02.get(); List goods03 = queue03.get(); log.info("result 01 {} ,result 02 {} , result 03 {} ",goods01,goods02,goods03); Thread.sleep(2000); OrderCollapseCommand collapseCommand04 = new OrderCollapseCommand(orderService,"4"); Future> queue04 = collapseCommand04.queue(); List goods04 = queue04.get(); log.info("result 04 {} ",goods04); return CommonResponse.success(); } ``` ### 8.4.3 注解的方式 注解方式配置 ```java /** * 订单详情 通过订单号列表 - 请求商品服务 * * @param orderNos * @return */ @Override public List> descs(List orderNos) { log.info("request order to get good infos: {}", JSON.toJSONString(orderNos)); List> result = new ArrayList<>(); orderNos.forEach(o ->{ List goods = goodFeignClient.listByOrderNo(o); if (goods != null){ result.add(goods); } } ); log.info("descs return result {}",JSON.toJSONString(result)); return result; } /** * 订单商品 - 单个重新使用注解合并请求 * * @param orderNo * @return */ @Override @HystrixCollapser( batchMethod = "descGoods", scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, collapserProperties = { @HystrixProperty(name = "timerDelayInMilliseconds",value = "300") } ) public Future> descGood(String orderNo) { throw new RuntimeException("This method body should not be executed!"); } /** * 订单商品 - 合并请求 * * @param orderNos * @return */ @Override @HystrixCommand public List> descGoods(List orderNos) { return descs(orderNos); } ``` 测试接口 ```java @GetMapping("/merge/annotation/good/desc") public CommonResponse descsMergeByAnnotation() throws Exception { Future> future01 = orderService.descGood("1"); Future> future02 = orderService.descGood("2"); Future> future03 = orderService.descGood("3"); List goods01 = future01.get(); List goods02 = future02.get(); List goods03 = future03.get(); log.info("result 01 {} ,result 02 {} , result 03 {} ",goods01,goods02,goods03); Thread.sleep(2000); OrderCollapseCommand collapseCommand04 = new OrderCollapseCommand(orderService,"4"); Future> future04 = orderService.descGood("4"); List goods04 = future04.get(); log.info("result 04 {} ",goods04); return CommonResponse.success(); } ``` ## 8.5 OpenFeign集成Hystrix 开启后备模式 - 在配置文件中开启Hystrix 的熔断功能:feign.hystrix.enabled: true - @FeignClient注解的 fallback和fallbackFactory 属性 完整的hystrix配置 ```yaml #Feign的相关配置 feign: hystrix: enabled: true # 开启gzip压缩 compression: request: enabled: true mime-types: text/xml,application/xml,application/json min-request-size: 1024 response: enabled: true # 禁用黑犬认的http,启用 okhttp httpclient: enabled: false okhttp: enabled: true ``` 编写GoodFeignClientFallback实现于GoodFeignClient ```java package cn.flowboot.e.commerce.feign.fallback; import cn.flowboot.e.commerce.dto.Good; import cn.flowboot.e.commerce.dto.SearchGoodByIdsDto; import cn.flowboot.e.commerce.feign.GoodFeignClient; import cn.flowboot.e.commerce.vo.CommonResponse; import java.util.Collections; import java.util.List; /** * 后备回退 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/07 */ @Component public class GoodFeignClientFallback implements GoodFeignClient { /** * list - feign 访问 * version: 1.0 - 2022/3/5 * * @param searchGoodByIdsDto * @return {@link List< Good> } */ @Override public CommonResponse list(SearchGoodByIdsDto searchGoodByIdsDto) { return CommonResponse.fail(); } @Override public List listByOrderNo(String orderNo) { return Collections.emptyList(); } } ``` 添加注解 ```java package cn.flowboot.e.commerce.feign; import cn.flowboot.e.commerce.dto.Good; import cn.flowboot.e.commerce.dto.SearchGoodByIdsDto; import cn.flowboot.e.commerce.feign.fallback.GoodFeignClientFallback; import cn.flowboot.e.commerce.vo.CommonResponse; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.*; import java.util.List; /** * 与Good 服务通信的feign Client接口定义 * contextId 对定义同一个服务的区分 * @version 1.0 * @author: Vincent Vic * @since: 2022/03/05 */ @FeignClient(contextId = "GoodFeignClient",value = "e-commerce-demo-good",fallback = GoodFeignClientFallback.class) public interface GoodFeignClient { /** * list - feign 访问 * version: 1.0 - 2022/3/5 * @param searchGoodByIdsDto * @return {@link List< Good> } */ @RequestMapping(value = "/demo-good/good/search/list",method = RequestMethod.POST,consumes = "application/json",produces = "application/json") CommonResponse list(@RequestBody SearchGoodByIdsDto searchGoodByIdsDto); /** * listByOrderNo - 查询订单商品 * version: 1.0 - 2022/3/7 * @param orderNo 订单号 * @return {@link List< Good> } */ @RequestMapping(value = "/demo-good/good/list",method = RequestMethod.GET) List listByOrderNo(@RequestParam String orderNo); } ``` GoodFeignClientFallbackFactory 工厂模式 ```java package cn.flowboot.e.commerce.feign.fallback; import cn.flowboot.e.commerce.dto.Good; import cn.flowboot.e.commerce.dto.SearchGoodByIdsDto; import cn.flowboot.e.commerce.feign.GoodFeignClient; import cn.flowboot.e.commerce.vo.CommonResponse; import feign.hystrix.FallbackFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.Collections; import java.util.List; /** * 工厂模式 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/07 */ @Slf4j @Component public class GoodFeignClientFallbackFactory implements FallbackFactory { @Override public GoodFeignClient create(Throwable throwable) { log.info("good feign client get list by feign request (FallbackFactory) [{}]",throwable.getMessage()); return new GoodFeignClient() { @Override public CommonResponse list(SearchGoodByIdsDto searchGoodByIdsDto) { return CommonResponse.fail(throwable.getMessage()); } @Override public List listByOrderNo(String orderNo) { return Collections.emptyList(); } }; } } ``` 注解修改为使用fallbackFactory ```java @FeignClient(contextId = "GoodFeignClient",value = "e-commerce-demo-good",fallbackFactory = GoodFeignClientFallbackFactory.class ) ``` ## 8.6 监控面板 ### 8.6.1 创建监控项目 创建项目,添加maven ```java com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-netflix-hystrix org.springframework.cloud spring-cloud-starter-netflix-hystrix-dashboard ``` 启动类,添加@EnableHystrixDashboard注解 ```java package cn.flowboot.e.commerce; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard; /** * * http://localhost:9999/hystrix-dashboard/hystrix * http://localhost:8900/demo-order/actuator/hystrix.stream * @version 1.0 * @author: Vincent Vic * @since: 2022/03/07 */ @EnableDiscoveryClient @EnableHystrixDashboard @SpringBootApplication public class HystrixDashboardApplication { public static void main(String[] args) { SpringApplication.run(HystrixDashboardApplication.class,args); } } ``` 配置 ```yaml server: port: 9999 servlet: context-path: /hystrix-dashboard spring: application: name: e-commerce-hystrix-dashboard cloud: nacos: #服务发现 discovery: enabled: true server-addr: 127.0.0.1:8848 namespace: e-commerce-nacos-server metadata: management: context-path: ${server.servlet.context-path}/actuator hystrix: dashboard: proxy-stream-allow-list: "127.0.0.1" # 暴露端点 management: endpoints: web: exposure: include: '*' endpoint: health: show-details: always ``` ### 8.6.2 打开监控面板 http://localhost:9999/hystrix-dashboard/hystrix  http://127.0.0.1:8900/demo-order/actuator/hystrix.stream  # 九、分布式事务Seata ## 9.1 @Transactional注解 > @Transactional是Spring 事务管理提供的注解,在一个方法中加上了这个注解,那么这个方法就将是有事务的,方法内的操作要么一起提交、要么一起回滚 ```java @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface Transactional { /** * 当在配置文件中有多个 TransactionManager , 可以用该属性指定选择哪个事务管理器。 */ @AliasFor("transactionManager") String value() default ""; /** * 同上。 */ @AliasFor("value") String transactionManager() default ""; /** * 事务的传播行为,默认值为 REQUIRED。 */ Propagation propagation() default Propagation.REQUIRED; /** * 事务的隔离规则,默认值采用 DEFAULT。 */ Isolation isolation() default Isolation.DEFAULT; /** * 事务超时时间。 */ int timeout() default TransactionDefinition.TIMEOUT_DEFAULT; /** * 是否只读事务 */ boolean readOnly() default false; /** * 用于指定能够触发事务回滚的异常类型。 */ Class extends Throwable>[] rollbackFor() default {}; /** * 同上,指定类名。 */ String[] rollbackForClassName() default {}; /** * 用于指定不会触发事务回滚的异常类型 */ Class extends Throwable>[] noRollbackFor() default {}; /** * 同上,指定类名 */ String[] noRollbackForClassName() default {}; } ``` **@Transactional注解最常见的应用** - 可以标注在类、方法和接口(不要这样用)上;且方法上的注解会覆盖类上的注解 - 标注在方法上,标识开启事务功能,正常则提交、异常则回滚 - 自行指定rollbackFor属性,让Checked Exception 也能够实现回滚 - 让TestCase 也能够实现回滚 **测试用例@Transactional的使用** ```java /** * testTransactional01 - 测试事务时会回滚事务 - 执行后会回滚 * @Transactional 如果为提交则不会回滚 * version: 1.0 - 2022/3/8 */ @Transactional @Test public void testTransactional01(){ //数据库存储操作 } /** * testTransactional02 - 测试事务时会回滚事务 - 指定不回滚 - 即使发生异常也不会回滚 * version: 1.0 - 2022/3/8 */ @Rollback(value = false) @Transactional @Test public void testTransactional02(){ //数据库存储操作 throw new RuntimeException("error"); } ``` **@Transactional注解失效的场景** - 把注解标注在非public修饰的方法上 - propagation(传播行为)属性配置错误(不合理)rollbackFor属性设置错误 - 在同一个类中方法调用,导致事务失效 - 自己主动去catch,代表『没有出现』异常,导致事务失效 - 数据库引擎本身就不支持事务(例如MyISAM ),当然也不会生效 在**同一个类中方法**调用,如下列代码调用wrongRollbackFor会导致事务失效 ```java @Transactional @Override public void wrongRollbackFor() throws Exception { //数据上的操作 throw new RuntimeException("发生异常,测试@Transactional"); } @Override public void wrongInnweCall() throws Exception { wrongRollbackFor(); } ``` ## 9.2 分布式事务解决方案 分布式事务是来源于微服务的(或类似的场景),服务之间存在着调用,且整个调用链路上存在着多处(分布在不同的微服务上)写数据表的行为,那么,分布式事务就要保证这些操作要么全部成功,要么全部失败 ```mermaid graph LR A[Order Service] -- 创建订单--> ODB((订单数据库)) A --> B[Account Service] -- 扣除余额--> ADB((账号数据库)) A --> C[Goods Service] -- 扣除库存--> GDB((商品数据库)) A --> D[Logistics Service] -- 物流订单 --> LDB((物流数据库)) ``` **分布式事务可能追求的一致性条件不同(业务特性)** - 强一致性:任何一次读都能读到某个数据的最近一次写的数据(要求最高) - 弱一致性:数据更新后,如果能容忍后续的访问只能访问到部分或者全部访问不到,则是弱─致性(绝大多数的业务场景都不允许) - 最终━致性:不保证在任意时刻数据都是完整的(状态一致),但是,随时时间的推移(会有个度量),数据总是会达到一致的状态 **最常用的分布式事务的解决方案:两阶段提交** - 两阶段指的是分两步提交﹔存在一个中央协调器负责协调各个分支事务 第一阶段: ```mermaid sequenceDiagram 中央协调器->> 本地资源管理器A: 是否就绪 本地资源管理器A->>中央协调器: 就绪 中央协调器->> 本地资源管理器B: 是否就绪 本地资源管理器B->>中央协调器: 就绪 ``` 第二阶段: ```mermaid sequenceDiagram 中央协调器->> 本地资源管理器A: 提交 本地资源管理器A->>中央协调器: 成功 中央协调器->> 本地资源管理器B: 提交 本地资源管理器B->>中央协调器: 成功 ``` **最常用的分布式事务的解决方案:本地消息表** - 该方案的核心是将需要分布式处理的任务通过消息日志的方式来异步执行  ## 9.3 Seata AT ### 9.3.1 Seata AT 概述 [官方文档](https://seata.io/zh-cn/docs/overview/what-is-seata.html) **Seata 是什么?** Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。  AT 模式 **前提** - 基于支持本地 ACID 事务的关系型数据库。 - Java 应用,通过 JDBC 访问数据库。 **整体机制** 两阶段提交协议的演变: - 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。 - 二阶段: - 提交异步化,非常快速地完成。 - 回滚通过一阶段的回滚日志进行反向补偿。  **Seata术语** **TC (Transaction Coordinator)** - 事务协调者 > 维护全局和分支事务的状态,驱动全局事务提交或回滚。 **TM (Transaction Manager)** - 事务管理器 > 定义全局事务的范围:开始全局事务、提交或回滚全局事务。 **RM (Resource Manager)** - 资源管理器 > 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。 ### 9.3.2 部署 [下载](https://seata.io/zh-cn/blog/download.html) | [官方部署指南](https://seata.io/zh-cn/docs/ops/deploy-guide-beginner.html) **目录结构**(采用版本1.4.2) ```shell +---bin +---conf | +---logback | \---META-INF | \---services +---lib | \---jdbc ``` 修改配置文件,这里使用MySQL作为存储 **(注意:修改之前记得先进行备份操作)** ```java ## transaction log store, only used in seata-server store { ## store mode: file、db、redis mode = "db" ## rsa decryption public key publicKey = "" ## file store property file { ## store location dir dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions maxBranchSessionSize = 16384 # globe session size , if exceeded throws exceptions maxGlobalSessionSize = 512 # file buffer size , if exceeded allocate new buffer fileWriteBufferCacheSize = 16384 # when recover batch read size sessionReloadReadSize = 100 # async, sync flushDiskMode = async } ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.jdbc.Driver" ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true" user = "root" password = "root" minConn = 5 maxConn = 100 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 maxWait = 5000 } ## redis store property redis { ## redis mode: single、sentinel mode = "single" ## single mode property single { host = "127.0.0.1" port = "6379" } ## sentinel mode property sentinel { masterName = "" ## such as "10.28.235.65:26379,10.28.235.65:26380,10.28.235.65:26381" sentinelHosts = "" } password = "" database = "0" minConn = 1 maxConn = 10 maxTotal = 100 queryLimit = 100 } } ``` > 主要修改 > mode = "db" > db { url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true" user = "root" password = "root" } seata 所需要的数据表 > 1.4版本mysql数据库语句:https://github.com/seata/seata/blob/1.4.0/script/server/db/mysql.sql ```sql -- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` ( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` ( `row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(96), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_branch_id` (`branch_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; ``` 采用nacos作为注册中心 ```java registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" nacos { application = "seata-server" serverAddr = "127.0.0.1:8848" group = "SEATA_GROUP" namespace = "SEATA" cluster = "default" username = "nacos" password = "nacos" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6379" db = 0 password = "" cluster = "default" timeout = 0 } zk { cluster = "default" serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" aclToken = "" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "file" nacos { serverAddr = "127.0.0.1:8848" namespace = "SEATA" group = "SEATA_GROUP" username = "nacos" password = "nacos" dataId = "seataServer.properties" } consul { serverAddr = "127.0.0.1:8500" aclToken = "" } apollo { appId = "seata-server" ## apolloConfigService will cover apolloMeta apolloMeta = "http://192.168.1.204:8801" apolloConfigService = "http://192.168.1.204:8080" namespace = "application" apolloAccesskeySecret = "" cluster = "seata" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" nodePath = "/seata/seata.properties" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } } ``` > 这里修改为nacos主要是这些属性 > type = "nacos" > 注册和配置下的nacos信息均为如下 > nacos { application = "seata-server" serverAddr = "127.0.0.1:8848" group = "SEATA_GROUP" namespace = "SEATA" cluster = "default" username = "nacos" password = "nacos" } 注意namespace 的值SEATA是在Nacos上创建的命名空间的ID,根据自己环境配置 **下载config.txt与nacos-config.sh文件** 1.4版本config.txt: [下载](https://github.com/seata/seata/blob/1.4.0/script/config-center/config.txt) 将config.txt放conf同级目录下 ```yaml # default_tx_group 后续配置会使用到,可以自定义 service.vgroupMapping.default_tx_group=default #配置数据库 store.mode=db store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true store.db.user=root store.db.password=root ``` 1.4版本nacos-config.sh:[下载](https://github.com/seata/seata/blob/1.4.0/script/config-center/nacos/nacos-config.sh) 将nacos-config.sh放conf下 输入命令行,将这些配置导入到nacos的seata命名空间中: ```shell sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t SEATA -u nacos -w nacos ``` > SEATA 是之前创建的命名空间ID 命令解析: - -h -p 指定nacos的端口地址; - -g 指定配置的分组,注意,是配置的分组; - -t 指定命名空间id; - -u -w指定nacos的用户名和密码,同样,这里开启了nacos注册和配置认证的才需要指定。 > 这两个文件的作用: config.txt就是seata各种详细的配置,执行 nacos-config.sh 即可将这些配置导入到nacos,这样就不需要将file.conf和registry.conf放到我们的项目中了,需要什么配置就直接从nacos中读取。 ### 9.3.3 系统集成 需要在每个微服务数据库创建undo_log表 ```sql -- for AT mode you must to init this sql for you business database. the seata server not need it. -- 注意此处0.7.0+ 增加字段 context CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ``` 添加依赖 ```xml com.alibaba.cloud spring-cloud-starter-alibaba-seata 2.2.7.RELEASE io.seata seata-spring-boot-starter 1.4.2 com.alibaba druid-spring-boot-starter 1.2.6 ``` 启动类添加@EnableAutoDataSourceProxy注解 yaml 添加配置 ```yaml seata: tx-service-group: default_tx_group registry: type: nacos nacos: application: seata-server server-addr: localhost:8848 group: SEATA_GROUP username: nacos password: nacos namespace: SEATA config: type: nacos nacos: server-addr: localhost:8848 group: SEATA_GROUP username: nacos password: nacos namespace: SEATA ``` ### 9.3.4 业务测试 创建订单业务  ```java /** * 创建订单业务接口 * @return */ @GetMapping("createOrder") public StoreOrder createOrder(@RequestParam(defaultValue = "1") Integer goodId){ try { log.info("create order by good id : {}",goodId); return storeOrderService.createOrder(goodId); } catch (Exception e) { log.error("create order fail ",e); return null; } } ``` 服务中添加@GlobalTransactional(rollbackFor = Exception.class)分布式微服务事务注解 ```java /** * 创建订单 减少库存 * 仅为演示回滚,非实际业务 * @param goodId * @return * @throws Exception */ @Override @GlobalTransactional(rollbackFor = Exception.class) public StoreOrder createOrder(Integer goodId) throws Exception { //远程调用减少库存 goodFeignClient.deductStock(goodId, 1); //模拟订单创建 StoreOrder order = new StoreOrder(); order.setOrderNo(UUID.randomUUID().toString().replace("-","")); //仅为演示否则此处需要先获取商品信息,这里演示回滚,非实际业务 order.setPrice(new BigDecimal(100*goodId)); order.setGoodId(goodId); order.setCreateTime(new Date()); order.setUpdateTime(new Date()); save(order); //模拟发生错误 throw new Exception("error"); //return order; } ``` > **`需要注意的是1.4.2 中有一个问题`** > SEATA 1.4.x版本在MySQL8.0中执行undo时报错Cannot construct instance of `java.time.LocalDateTime` 来自 https://blog.csdn.net/richie696/article/details/116896511 解决方法,其中第三点似乎并不能解决问题,如果是新项目可以使用时间戳解决  > 测试接口后查看数据库是否正常回滚 # 十、消息驱动 ## 10.1 Kafka > [Apache Kafka](https://kafka.apache.org/) 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。  > 环境搭建在参考 6.4 Kafka数据传输 ### 10.1.1 Spring Boot 集成 > 单独创建模块(仅演示demo) ```xml cn.flowboot.e.commerce e-commerce-common org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka ``` 配置可以直接在配置文件中配置也可以代码进行配置 yaml配置如下 ```yaml server: port: 9191 spring: kafka: bootstrap-servers: 127.0.0.1:9092 ``` 代码方式配送(注:非必要,可直接在yml配置) ```java package cn.flowboot.e.commerce.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; /** * 通过代码自定义Kafka配置 * 也可以通过yml文件配置-非必须 * @version 1.0 * @author: Vincent Vic * @since: 2022/03/10 */ //@Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; /** * producerFactory - Kafka Producer 工厂配置 * version: 1.0 - 2022/3/10 * @return {@link ProducerFactory< String, String> } */ @Bean public ProducerFactory producerFactory(){ Map configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); return new DefaultKafkaProducerFactory<>(configs); } /** * kafkaTemplate - 客户端 * version: 1.0 - 2022/3/10 * @return {@link KafkaTemplate< String, String> } */ @Bean public KafkaTemplate kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); } /** * consumerFactory - Kafka Consumer 工厂配置 * version: 1.0 - 2022/3/10 * @return {@link ConsumerFactory< String, String> } */ @Bean public ConsumerFactory consumerFactory(){ Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } /** * kafkaListenerContainerFactory - 监听器工厂配置 * version: 1.0 - 2022/3/10 * @param * @return {@link ConcurrentKafkaListenerContainerFactory< String, String> } */ @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); //并发数 factory.setConcurrency(3); factory.setConsumerFactory(consumerFactory()); return factory; } } ``` > 启动类略 消息VO ```java package cn.flowboot.e.commerce.vo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * 自定义Kafka 传递的消息对象 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/10 */ @AllArgsConstructor @NoArgsConstructor @Data public class KafkaMessage { private Integer id; private String projectName; } ``` Kafaka生产者 ```java package cn.flowboot.e.commerce.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.TimeUnit; /** * Kafaka生产者 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/10 */ @Slf4j @RequiredArgsConstructor @Component public class KafkaProducer { private final KafkaTemplate kafkaTemplate; /** * sendMessage - 发送Kafka消息 * version: 1.0 - 2022/3/10 * @param key 键 * @param value 值 * @param topic 主题 */ public void sendMessage(String key,String value,String topic){ if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)){ throw new IllegalArgumentException("value or topic is null or empty"); } ListenableFuture> future = StringUtils.isBlank(key)? kafkaTemplate.send(topic, value):kafkaTemplate.send(topic,key, value); // 异步回调的方式获取通知 future.addCallback( success ->{ assert null != success && null != success.getRecordMetadata(); // 发送到 kafka 的 topic String _topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的 offset long offset = success.getRecordMetadata().offset(); log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset); }, failure ->{ log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic); } ); // 同步等待的方式获取通知 try { //SendResult sendResult = future.get(); SendResult sendResult = future.get(5, TimeUnit.SECONDS); // 发送到 kafka 的 topic String _topic = sendResult.getRecordMetadata().topic(); // 消息发送到的分区 int partition = sendResult.getRecordMetadata().partition(); // 消息在分区内的 offset long offset = sendResult.getRecordMetadata().offset(); log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset); } catch (Exception ex) { log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic); } } } ``` Kafaka消费者 ```java package cn.flowboot.e.commerce.kafka; import cn.flowboot.e.commerce.vo.KafkaMessage; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * Kafaka消费者 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/10 */ @Slf4j @RequiredArgsConstructor @Component public class KafkaConsumer { private final ObjectMapper mapper; /** * 监听 Kafka 消息并消费 * */ @KafkaListener(topics = {"kafka-springboot"}, groupId = "springboot-kafka") public void listener01(ConsumerRecord record) throws Exception { String key = record.key(); String value = record.value(); KafkaMessage kafkaMessage = mapper.readValue(value, KafkaMessage.class); log.info("in listener01 consume kafka message: [{}], [{}]", key, mapper.writeValueAsString(kafkaMessage)); } /** * 监听 Kafka 消息并消费 * */ @KafkaListener(topics = {"kafka-springboot"}, groupId = "springboot-kafka-1") public void listener02(ConsumerRecord, ?> record) throws Exception { Optional> _kafkaMessage = Optional.ofNullable(record.value()); if (_kafkaMessage.isPresent()) { Object message = _kafkaMessage.get(); KafkaMessage kafkaMessage = mapper.readValue(message.toString(), KafkaMessage.class); log.info("in listener02 consume kafka message: [{}]", mapper.writeValueAsString(kafkaMessage)); } } } ``` kafka 测试API ```java package cn.flowboot.e.commerce.controller; import cn.flowboot.e.commerce.kafka.KafkaProducer; import cn.flowboot.e.commerce.vo.KafkaMessage; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * kafka 测试控制层 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/10 */ @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("kafka") public class KafkaController { private final ObjectMapper mapper; private final KafkaProducer kafkaProducer; /** * 发送 kafka 消息 * */ @GetMapping("/send-message") public void sendMessage(@RequestParam(required = false) String key, @RequestParam String topic) throws Exception { KafkaMessage message = new KafkaMessage(1, "kafka"); kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic); } } ``` ## 10.2 RocketMQ ### 10.2.1 搭建 [官网](https://rocketmq.apache.org/) | [rocketmq-all-4.9.3-bin-release 下载](https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip) 当前环境 > win10 jdk1.8 rocketMQ安装版本 4.9.3 系统环境变量配置 > 变量名:ROCKETMQ_HOME 变量值:MQ解压路径\MQ文件夹名 (如:D:\RocketMQ) 启动 在解压目录\bin下通过cmd分别启动 > 1. NameServer : start mqnamesrv.cmd > 2. Borker :start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true 启动NameServer > 假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号。保存并重新执行start语句。 Linux 下 > 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below) 64bit JDK 1.8+; Maven 3.2.x; Git; 4g+ free disk for Broker server 下载源码版本 ```shell > unzip rocketmq-all-4.9.3-source-release.zip > cd rocketmq-all-4.9.3/ > mvn -Prelease-all -DskipTests clean install -U > cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3 ``` 启动名称服务器 ```shell > nohup sh bin/mqnamesrv & > tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success... ``` 启动代理 ```shell > nohup sh bin/mqbroker -n localhost:9876 & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success... ``` ### 10.2.2 Spring Boot 集成 新建项目 ```xml cn.flowboot.e.commerce e-commerce-common org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.2.1 ``` yaml配置 ```yaml server: port: 9291 rocketmq: name-server: 127.0.0.1:9876 producer: group: rocket-mq-group ``` 消息VO ```java package cn.flowboot.e.commerce.vo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @AllArgsConstructor @NoArgsConstructor @Data public class RocketMQMessage { private Integer id; private String projectName; } ``` RocketMQ 生产者 ```java package cn.flowboot.e.commerce.rocket; import cn.flowboot.e.commerce.vo.RocketMQMessage; import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import java.time.LocalDateTime; /** * 通过RocketMQ发送消息 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @RequiredArgsConstructor @Component public class RocketMQProducer { private static final String TOPIC = "rocket-mq-topic"; private final RocketMQTemplate rocketMQTemplate; /** * sendMessageWithValue -使用同步的方式发送消息,不指定key 和tag * version: 1.0 - 2022/3/11 * @param value 值 */ public void sendMessageWithValue(String value){ // 随机选择一个Topic的Message Queue 发送消息 SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value); log.info("sendMessageWithValue result : [{}]", JSON.toJSONString(sendResult)); SendResult syncSendOrderly = rocketMQTemplate.syncSendOrderly(TOPIC, value, "RocketMQ-Key"); log.info("sendMessageWithValue orderly result : [{}]", JSON.toJSONString(syncSendOrderly)); } /** * sendMessageWithValue -使用异步的方式发送消息,指定key * version: 1.0 - 2022/3/11 * @param key * @param value 值 */ public void sendMessageWithKey(String key,String value){ Message message = MessageBuilder.withPayload(value) .setHeader(RocketMQHeaders.KEYS,key).build(); //异步发送消息,设定回调 rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("sendMessageWithKey onSuccess result : [{}]", JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { log.info("sendMessageWithKey onException result : [{}]", throwable.getMessage(),throwable); } }); } /** * sendMessageWithTag - 使用同步的方式发送消息, 带有 tag, 且发送的是 Java Pojo * version: 1.0 - 2022/3/11 * @param tag * @param value */ public void sendMessageWithTag(String tag, String value) { RocketMQMessage rocketMQMessage = JSON.parseObject(value, RocketMQMessage.class); SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", TOPIC, tag), rocketMQMessage); log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult)); } /** * sendMessageWithAll - 使用同步的方式发送消息, 带有 key 和 tag * version: 1.0 - 2022/3/11 * @param key * @param tag * @param value */ public void sendMessageWithAll(String key, String tag, String value) { Message message = MessageBuilder.withPayload(value) .setHeader(RocketMQHeaders.KEYS, key).build(); SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", TOPIC, tag), message); log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult)); } private static final String DELAY_TOPIC = "rocket-mq-delay-topic"; private SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable throwable) { } }; /** * 发送延时消息 * 延时消息等级分为18个:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h * @param message 消息 * @param delayLevel 延迟等级 * @author tyg * @date 2021-03-24 14:55 * @return void */ public void sendDelayMessage(String message,Integer delayLevel){ log.info("sendDelayMessage now time : {}", LocalDateTime.now()); rocketMQTemplate.asyncSend(RocketMQProducer.DELAY_TOPIC,MessageBuilder.withPayload(message).build(),callback,3000,delayLevel); } } ``` 服务消费者继承于RocketMQListener 简单消费者 ```java package cn.flowboot.e.commerce.rocket; import cn.flowboot.e.commerce.vo.RocketMQMessage; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * RocketMQ 消费者 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @Component @RocketMQMessageListener( topic = "rocket-mq-topic", consumerGroup = "springboot-rocketmq-string" ) public class RocketMQConsumerString implements RocketMQListener { @Override public void onMessage(String message) { RocketMQMessage rocketMQMessage = JSON.parseObject(message, RocketMQMessage.class); log.info("consumer message in RocketMQConsumerString : [{}]",message); } } ``` RocketMQ 消费者 指定带有tag的消息 ```java package cn.flowboot.e.commerce.rocket; import cn.flowboot.e.commerce.vo.RocketMQMessage; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * RocketMQ 消费者 指定带有tag的消息 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @Component @RocketMQMessageListener( topic = "rocket-mq-topic", consumerGroup = "springboot-rocketmq-tag-string", selectorExpression = "rocket" ) public class RocketMQConsumerTagString implements RocketMQListener { @Override public void onMessage(String message) { RocketMQMessage rocketMQMessage = JSON.parseObject(message, RocketMQMessage.class); log.info("consumer message in RocketMQConsumerString : [{}]",message); } } ``` RocketMQ 消费者 使用自定义对象 ```java package cn.flowboot.e.commerce.rocket; import cn.flowboot.e.commerce.vo.RocketMQMessage; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * RocketMQ 消费者 Object * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @Component @RocketMQMessageListener( topic = "rocket-mq-topic", consumerGroup = "springboot-rocketmq-tag-object", selectorExpression = "rocket" ) public class RocketMQConsumerObject implements RocketMQListener { @Override public void onMessage(RocketMQMessage message) { log.info("consume message in RocketMQConsumerObject : [{}]", JSON.toJSONString(message)); } } ``` RocketMQ 消费者 消息扩展 可获取KEY ```java package cn.flowboot.e.commerce.rocket; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * RocketMQ 消费者 消息扩展 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @Component @RocketMQMessageListener( topic = "rocket-mq-topic", consumerGroup = "springboot-rocketmq-message-ext" ) public class RocketMQConsumerMessageExt implements RocketMQListener { @Override public void onMessage(MessageExt messageExt) { String value = new String(messageExt.getBody()); log.info("consumer message in RocketMQConsumerMessageExt : [{}],[{}]",messageExt.getKeys(),value); log.info("MessageExt :[{}]", JSON.toJSONString(messageExt)); } } ``` RocketMQ 消费者 -对应延迟消费者 ```java package cn.flowboot.e.commerce.rocket; import cn.flowboot.e.commerce.vo.RocketMQMessage; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; /** * RocketMQ 消费者 -对应延迟消费者 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @Component @RocketMQMessageListener( topic = "rocket-mq-delay-topic", consumerGroup = "springboot-rocketmq-delay" ) public class RocketMQConsumerDelay implements RocketMQListener { @Override public void onMessage(RocketMQMessage message) { log.info("Now time : {} receive delay message: [{}] ", LocalDateTime.now(),JSON.toJSONString(message)); } } ``` 用于测试的API ```java package cn.flowboot.e.commerce.controller; import cn.flowboot.e.commerce.rocket.RocketMQProducer; import cn.flowboot.e.commerce.vo.RocketMQMessage; import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/10 */ @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("rocket-mq") public class RocketMQController { private final RocketMQProducer rocketMQProducer; private static final RocketMQMessage message = new RocketMQMessage(1, "RocketMQ"); /** * 发送 RocketMQ 消息 * */ @GetMapping("/send-message/value") public void sendMessageWithValue(){ rocketMQProducer.sendMessageWithValue(JSON.toJSONString(message)); } /** * 发送 RocketMQ 消息 * */ @GetMapping("/send-message/key") public void sendMessageWithKey(){ rocketMQProducer.sendMessageWithKey("RocketMQ",JSON.toJSONString(message)); } /** * 发送 RocketMQ 消息 * */ @GetMapping("/send-message/tag") public void sendMessageWithTag(){ rocketMQProducer.sendMessageWithTag("rocket",JSON.toJSONString(message)); } /** * 发送 RocketMQ 消息 * */ @GetMapping("/send-message/all") public void sendMessageWithAll(){ rocketMQProducer.sendMessageWithAll("RocketMQ","rocket",JSON.toJSONString(message)); } /** * 发送延迟消息 * 延时消息等级分为18个:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h * delayLevel对应延迟等级就是上面的时间 共18个等级 * rocketmq只能指定延迟等级而不能自定义延迟时间,如果想自定义需要阿里巴巴提供的企业版rocketmq要收费 */ @GetMapping("sendDelay") public void sendDelay(){ // 延时消息等级分为18个:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h rocketMQProducer.sendDelayMessage(JSON.toJSONString(message),4); } } ``` ## 10.3 RabbitMQ ### 10.3.1 环境搭建 直接看这: [RabbitMQ安装教程](https://www.cnblogs.com/chensisi/p/13203111.html) ### 10.3.2 Spring Boot 集成 Kafka和RocketMQ,RabbitMQ 搭建思路基本一致、API和依赖些许不同 ```xml cn.flowboot.e.commerce e-commerce-common org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp ``` 消息VO ```java package cn.flowboot.e.commerce.vo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @AllArgsConstructor @NoArgsConstructor @Data public class RabbitMQMessage { private Integer id; private String projectName; } ``` RabbitMQ 生产者 ```java package cn.flowboot.e.commerce.rabbitmq; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; /** * RabbitMQ 生产者 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @RequiredArgsConstructor @Component public class RabbitMQProducer { private final RabbitTemplate rabbitTemplate; private static final String TOPIC = "rabbit-mq-topic"; private static final String FANOUT_EXCHANGE = "fanout-rabbit-mq-exchange"; private static final String ROUTING_EXCHANGE = "routing-rabbit-mq-exchange"; /** * sendMessage - workqueue模式 * version: 1.0 - 2022/3/11 * @param message */ public void sendMessage(String message){ log.info("send message rabbit mq : {}" ,message); rabbitTemplate.convertAndSend(TOPIC,message); } /** * testfanout - 发布订阅模式 * version: 1.0 - 2022/3/11 * @param message */ public void sendMessageByExchange(String message){ rabbitTemplate.convertAndSend(FANOUT_EXCHANGE, "",message); } /** * sendMessageByRouting - Routing(静态路由模型) * version: 1.0 - 2022/3/11 * @param message */ public void sendMessageByRouting(String message,String routingKey){ rabbitTemplate.convertAndSend(ROUTING_EXCHANGE, routingKey,message); } } ``` RabbitMQ 消费者 ```java package cn.flowboot.e.commerce.rabbitmq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @Component public class RabbitMQConsumer { /** * workqueue模式(拿到消息即销毁) * receiveA,receiveB 可以分流队列,在多实例当中可以被其中一个消费 * @param message */ @RabbitListener(queuesToDeclare = @Queue("rabbit-mq-topic")) public void receiveA(String message) { log.info("receive message by tipic rabbit-mq-topic [Client A] : {}",message); } /** * workqueue模式(拿到消息即销毁) * receiveA,receiveB 可以分流队列,在多实例当中可以被其中一个消费 * @param message */ @RabbitListener(queuesToDeclare = @Queue("rabbit-mq-topic")) public void receivelB(String message) { log.info("receive message by tipic rabbit-mq-topic [Client B] : {}",message); } /** * Publish模型(发布订阅/fanout模型) * receiveC,receiveD 可以分流队列,在多实例当中可以被其中一个消费 * @param message */ @RabbitListener(bindings = { @QueueBinding( value = @Queue,//声明临时队列 exchange = @Exchange(value = "fanout-rabbit-mq-exchange",type = "fanout") ) }) public void receiveC(String message) { log.info("receive message by fanout [Client C] : {}",message); } /** * Publish模型(发布订阅/fanout模型) * receiveC,receiveD 可以分流队列,在多实例当中可以被其中一个消费 * @param message */ @RabbitListener(bindings = { @QueueBinding( value = @Queue,//声明临时队列 exchange = @Exchange(value = "fanout-rabbit-mq-exchange",type = "fanout") ) }) public void receiveD(String message) { log.info("receive message by fanout [Client D] : {}",message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue,//声明临时队列 exchange = @Exchange(value = "routing-rabbit-mq-exchange", type = "direct"), key ={"error"} ) }) public void receiveE(String message) { log.info("receive message by fanout [Client E] : {}",message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue,//声明临时队列 exchange = @Exchange(value = "routing-rabbit-mq-exchange",type = "direct"), key ={"error","info"} ) }) public void receiveF(String message) { log.info("receive message by fanout [Client F] : {}",message); } } ``` 测试使用API ```java package cn.flowboot.e.commerce.controller; import cn.flowboot.e.commerce.rabbitmq.RabbitMQProducer; import cn.flowboot.e.commerce.vo.RabbitMQMessage; import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/10 */ @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("rabbit-mq") public class RabbitMQController { private final RabbitMQProducer rabbitMQProducer; private static final RabbitMQMessage message = new RabbitMQMessage(1, "RabbitMQ"); /** * 发送 RabbitMQ 消息 - workqueue模式(拿到消息即销毁) * */ @GetMapping("/send-message") public void sendMessage(){ rabbitMQProducer.sendMessage(JSON.toJSONString(message)); } /** * 发送 RabbitMQ 消息 - Publish模型(发布订阅/fanout模型) * */ @GetMapping("/send-message/publish") public void sendMessageByPublish(){ rabbitMQProducer.sendMessageByExchange(JSON.toJSONString(message)); } /** * 发送 RabbitMQ 消息 - Routing(静态路由模型) * */ @GetMapping("/send-message/routing") public void sendMessageByRouting(String routing){ rabbitMQProducer.sendMessageByRouting(JSON.toJSONString(message),routing); } } ``` ## 10.4 Stream ## 10.4.1 简介 项目直接继承消息队列会与项目形成耦合,如果需要修改消息队列又需要修改代码,Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq 。Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架,是一个基于Spring Boot 创建的独立生产级的,使用Spring Integration提供连接到消息代理的Spring应用。  应用模型 - Topic可以认为就是Kafka中的Topic概念 - Producer 通过Input信道发布消息到Topic 上 - Consumer通过Output信道消费Topic 上的消息  ## 10.4.2 Stream Kafka maven ```xml org.springframework.cloud spring-cloud-starter-stream-kafka org.springframework.cloud spring-cloud-stream ``` 配置 ```yaml server: port: 9491 spring: cloud: stream: kafka: binder: brokers: 127.0.0.1:9092 auto-create-topics: true #如果设置为false,就不会自动创建Topic,你在使用之前需要手动创建好 # SpringCloud Stream + RocketMQ # rocketmq: # binder: # name-server: 127.0.0.1:9876 # 开启 stream 分区支持 instanceCount: 1 # 消费者的总数 instanceIndex: 0 # 当前消费者的索引 bindings: # 默认发送方 output: # 这里用 Stream 给我们提供的默认 output 信道 destination: stream-kafka-client-default # 消息发往的目的地, Kafka 中就是 Topic content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要 # 消息分区 producer: # partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性 partitionCount: 1 # 分区大小 # 使用自定义的分区策略, 注释掉 partitionKeyExpression partitionKeyExtractorName: streamPartitionKeyExtractorStrategy partitionSelectorName: streamPartitionSelectorStrategy # 默认接收方 input: # 这里用 Stream 给我们提供的默认 input 信道 destination: stream-kafka-client-default group: stream-kafka-client-default # 消费者开启分区支持 consumer: partitioned: true # custom 发送方 customOutput: destination: stream-kafka-client content-type: text/plain # custom 接收方 customInput: destination: stream-kafka-client group: stream-kafka-client # spring-kafka 的配置 kafka: bootstrap-servers: 127.0.0.1:9092 producer: retries: 3 consumer: auto-offset-reset: lates ``` 消息VO ```java package cn.flowboot.e.commerce.vo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @AllArgsConstructor @NoArgsConstructor @Data public class StreamMessage { private Integer id; private String projectName; private String org; private String author; private String version; public static StreamMessage defaultMessage(){ return new StreamMessage( 1, "stream-kafka-client", "flowboot.cn", "Vincent Vic", "1.0" ); } } ``` 使用默认通信信道发送 ```java package cn.flowboot.e.commerce.stream; import cn.flowboot.e.commerce.vo.StreamMessage; import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 使用默认通信信道发送 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @RequiredArgsConstructor @EnableBinding(Source.class) @Component public class DefaultSendService { private final Source source; /** * sendMessage - 使用默认的输出信道发送消息 * version: 1.0 - 2022/3/11 * @param message */ public void sendMessage(StreamMessage message){ String _message = JSON.toJSONString(message); log.info("in DefaultSendService send message : [{}]",_message); source.output().send(MessageBuilder.withPayload(_message).build()); } } ``` 使用默认通信信道接收 ```java package cn.flowboot.e.commerce.stream; import cn.flowboot.e.commerce.vo.StreamMessage; import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; /** * 使用默认通信信道接收 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @RequiredArgsConstructor @EnableBinding(Sink.class) @Component public class DefaultReceiveService { /** * receiveMessage - 使用默认的输入信道发送消息 * version: 1.0 - 2022/3/11 * @param payload 消息 */ @StreamListener(Sink.INPUT) public void receiveMessage(Object payload){ log.info("in DefaultReceiveService receive message start"); StreamMessage message = JSON.parseObject(payload.toString(), StreamMessage.class); log.info("in DefaultReceiveService receive message success: [{}]",JSON.toJSONString(message)); } } ``` **自定义策略** 自定义从 Message 中提取 partition key 的策略 ```java package cn.flowboot.e.commerce.partition; import cn.flowboot.e.commerce.vo.StreamMessage; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; /** * 自定义从 Message 中提取 partition key 的策略 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @Component public class StreamPartitionKeyExtractorStrategy implements PartitionKeyExtractorStrategy { @Override public Object extractKey(Message> message) { StreamMessage qinyiMessage = JSON.parseObject( message.getPayload().toString(), StreamMessage.class ); // 自定义提取 key String key = qinyiMessage.getProjectName(); log.info("SpringCloud Stream Partition Key: [{}]", key); return key; } } ``` 决定 message 发送到哪个分区的策略 ```java package cn.flowboot.e.commerce.partition; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.binder.PartitionSelectorStrategy; import org.springframework.stereotype.Component; /** * 决定 message 发送到哪个分区的策略 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @Component public class StreamPartitionSelectorStrategy implements PartitionSelectorStrategy { /** * 选择分区的策略 * */ @Override public int selectPartition(Object key, int partitionCount) { int partition = key.toString().hashCode() % partitionCount; log.info("SpringCloud Stream Selector info: [{}], [{}], [{}]", key.toString(), partitionCount, partition); return partition; } } ``` **自定义信道** 自定义输出信道 ```java package cn.flowboot.e.commerce.stream.custom; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义输出信道 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ public interface CustomSource { String OUTPUT = "customOutput"; /** 输出信道的名称是 customOutput, 需要使用 Stream 绑定器在 yml 文件中声明 */ @Output(CustomSource.OUTPUT) MessageChannel customOutput(); } ``` 自定义输入信道 ```java package cn.flowboot.e.commerce.stream.custom; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义输入信道 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ public interface CustomSink { String INPUT = "customInput"; /** 输入信道的名称是 customInput, 需要使用 Stream 绑定器在 yml 文件中配置*/ @Input(CustomSink.INPUT) SubscribableChannel customInput(); } ``` 使用自定义的通信输出信道实现消息的发送 ```java package cn.flowboot.e.commerce.stream.custom; import cn.flowboot.e.commerce.vo.StreamMessage; import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 使用自定义的通信信道 CustomSource 实现消息的发送 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @RequiredArgsConstructor @EnableBinding(CustomSource.class) @Component public class CustomSendService { private final CustomSource customSource; /** * 使用自定义的输出信道发送消息 * */ public void sendMessage(StreamMessage message) { String _message = JSON.toJSONString(message); log.info("in CustomSendService send message: [{}]", _message); customSource.customOutput().send(MessageBuilder.withPayload(_message).build()); } } ``` 使用自定义的输入信道实现消息的接收 ```java package cn.flowboot.e.commerce.stream.custom; import cn.flowboot.e.commerce.vo.StreamMessage; import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; /** * 使用自定义的输入信道实现消息的接收 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/11 */ @Slf4j @RequiredArgsConstructor @EnableBinding(CustomSink.class) @Component public class CustomReceiveService { /** * receiveMessage - 使用自定义的输入信道接收消息 * version: 1.0 - 2022/3/11 * @param payload 消息 */ @StreamListener(CustomSink.INPUT) public void receiveMessage(@Payload Object payload) { log.info("in CustomReceiveService consume message start"); StreamMessage message = JSON.parseObject(payload.toString(), StreamMessage.class); log.info("in CustomReceiveService consume message success: [{}]", JSON.toJSONString(message)); } } ``` 测试接口 ```java package cn.flowboot.e.commerce.controller; import cn.flowboot.e.commerce.stream.DefaultSendService; import cn.flowboot.e.commerce.stream.custom.CustomSendService; import cn.flowboot.e.commerce.vo.StreamMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/10 */ @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("stream") public class StreamController { private final DefaultSendService defaultSendService; private final CustomSendService customSendService; /** * 发送 Steam 消息 * */ @GetMapping("/send-message") public void sendMessage(){ defaultSendService.sendMessage(StreamMessage.defaultMessage()); } /** * 使用自定义 发送 Steam 消息 * */ @GetMapping("/send-custom-message") public void sendCustomMessage(){ customSendService.sendMessage(StreamMessage.defaultMessage()); } } ``` ## 10.4.3 Stream RocketMQ maven ```xml org.springframework.cloud spring-cloud-stream com.alibaba.cloud spring-cloud-starter-stream-rocketmq ``` ```yaml server: port: 9591 spring: cloud: stream: #kafka: # binder: # brokers: 127.0.0.1:9092 # auto-create-topics: true #如果设置为false,就不会自动创建Topic,你在使用之前需要手动创建好 # SpringCloud Stream + RocketMQ rocketmq: binder: name-server: 127.0.0.1:9876 # 生成者group名称 group: rocketmq # 开启 stream 分区支持 instanceCount: 1 # 消费者的总数 instanceIndex: 0 # 当前消费者的索引 bindings: # 默认发送方 output: # 这里用 Stream 给我们提供的默认 output 信道 destination: stream-rocketmq-client-default # 消息发往的目的地, Kafka 中就是 Topic content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要 # 消息分区 producer: # partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性 partitionCount: 1 # 分区大小 # 使用自定义的分区策略, 注释掉 partitionKeyExpression partitionKeyExtractorName: streamPartitionKeyExtractorStrategy partitionSelectorName: streamPartitionSelectorStrategy # 默认接收方 input: # 这里用 Stream 给我们提供的默认 input 信道 destination: stream-rocketmq-client-default group: stream-rocketmq-client-default # 消费者开启分区支持 consumer: partitioned: true # custom 发送方 customOutput: destination: stream-rocketmq-client content-type: text/plain # custom 接收方 customInput: destination: stream-rocketmq-client group: stream-rocketmq-client ``` # 十一、Sentinel 流量控制 Sentinel是面向分布式服务架构的流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统自适应保护等多个维度来帮助您保障微服务的稳定性 SpringCloud Alibaba Sentinel流量控制方向 - 资源的调用关系,例如资源的调用链路,资源和资源之间的关系 - 运行指标,例如QPS、线程池、系统负载等 - 控制的效果,例如直接限流、冷启动、排队等 - ## 11.1 搭建SpringCloud Alibaba Sentinel控制台 > Sentinel提供一个轻量级的开源控制台,它提供机器发现以及健康情况管理、监控(单机和集群),规则管理和推送的功能 获取并启动Sentinel Dashboard(控制台) > 1.下载控制台Jar包: https://github.com/alibaba/Sentinel/releases > 2. java -Dserver.port=7777 -Dcsp.sentinel.dashboard.server=localhost:7777 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.1.jar 3.从 Sentinel 1.6.0起,Sentinel Dashboard引入了基本的登录功能,默认的用 户名密码都是sentinel ## 11.2 项目集成 创建子模块,配置maven ```xml e-commerce-sentinel-client jar 8 8 e-commerce-sentinel-client Sentinel Client com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery com.alibaba.cloud spring-cloud-starter-alibaba-sentinel org.springframework.cloud spring-cloud-starter-openfeign com.alibaba.csp sentinel-datasource-nacos cn.flowboot.e.commerce e-commerce-common org.springframework.boot spring-boot-starter-web ${artifactId} org.springframework.boot spring-boot-maven-plugin repackage ``` ## 11.3 降级 SpringCloud Alibaba Sentinel对降级功能的支持 ### 11.3.1 接口降级 #### 11.3.1.2 硬编码限流 FlowRuleCodeController ```java package cn.flowboot.e.commerce.controller; import cn.flowboot.e.commerce.block.handler.MyBlockHandler; import cn.flowboot.e.commerce.vo.CommonResponse; import com.alibaba.csp.sentinel.annotation.SentinelResource; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; /** * 流控规则硬编码 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/17 */ @Slf4j @RestController @RequestMapping("/code") public class FlowRuleCodeController { /** * 初始化流控规则 */ @PostConstruct public void init(){ //流控规则集合 List flowRules = new ArrayList<>(); //创建流控规则 FlowRule flowRule = new FlowRule(); //设置流控规则QPS,限流阈值类型CQPS,并发线程数 flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS); //流量控制手段 flowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT); //设置受保护的资源 flowRule.setResource("flowRuleCode"); //设置受保护的资源的阈值 flowRule.setCount(1); flowRules.add(flowRule); //加载配置好的规则 FlowRuleManager.loadRules(flowRules); } /** * 采用硬编码的限流规则的Controller方法 * @return */ @GetMapping("/flow-rule") //@SentinelResource(value = "flowRuleCode") //@SentinelResource(value = "flowRuleCode",blockHandler = "handleException") @SentinelResource(value = "flowRuleCode",blockHandler = "myHandleException",blockHandlerClass = MyBlockHandler.class) public CommonResponse flowRuleCode(){ log.info("request flowRuleCode"); return CommonResponse.success(); } /** * 当限流异常抛出时,指定调用的方法 * 是一个兜底策略 */ public CommonResponse handleException(BlockException e){ log.error("has block exception : [{}]", JSONObject.toJSONString(e.getRule())); return CommonResponse.fail("flow rule exception",e.getClass().getCanonicalName()); } } ``` 上文flowRuleCode方法注解使用到通用限流处理MyBlockHandler ,可切换注释使用不同使用策略 ```java package cn.flowboot.e.commerce.block.handler; import cn.flowboot.e.commerce.vo.CommonResponse; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; /** * 自定义通用处理逻辑 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/17 */ @Slf4j public class MyBlockHandler { /** * 通用限流处理方法 * 这个方法必须是static的 **/ public static CommonResponse myHandleException(BlockException e){ log.error("has block exception : [{}], [{}]", JSONObject.toJSONString(e.getRule()),e.getRuleLimitApp()); return CommonResponse.fail("trigger flow rule exception",e.getClass().getCanonicalName()); } } ``` #### 11.3.1.2 控制面板限流 RateLimitController ```java package cn.flowboot.e.commerce.controller; import cn.flowboot.e.commerce.block.handler.MyBlockHandler; import cn.flowboot.e.commerce.vo.CommonResponse; import com.alibaba.csp.sentinel.annotation.SentinelResource; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; /** * 基于Sentinel 控制台配置流控规则 * Sentinel 是懒加载的,先去访问一下,就可以在 Sentinel Dashboard看到了 * @version 1.0 * @author: Vincent Vic * @since: 2022/03/17 */ @Slf4j @RestController @RequestMapping("/rate") public class RateLimitController { /** * 在 dashboard 中“流控规则”中按照资源名称新增流控规则 * @return */ @GetMapping("/by-resource") @SentinelResource(value = "byResource",blockHandler = "myHandleException",blockHandlerClass = MyBlockHandler.class) public CommonResponse byResource(){ log.info("coming in rate limit controller by resource"); return CommonResponse.success(); } /** * 在 "触点链路" 中给URL添加流控规则 * @return */ @GetMapping("/by-url") @SentinelResource(value = "byUrl") public CommonResponse byUrl(){ log.info("coming in rate limit controller by Url"); return CommonResponse.success("byUrl"); } } ``` 添加流控规则,两个相同名称,一个是硬编码指定,一个是动态添加(保存JVM内存中)  ### 11.3.2 RestTemplate 降级 Sentinel支持对RestTemplate 服务调用进行保护,实现流控降级和异常降级  ```yaml # 开启或关闭 @SentinelRestTemplate resttemplate: sentinel: enabled: true ``` SentinelFallbackController :通过@SentinelResource配置异常处理 ```java package cn.flowboot.e.commerce.controller; import cn.flowboot.e.commerce.conf.RestTemplateExceptionHandler; import cn.flowboot.e.commerce.fallback.MyFallbackHandler; import cn.flowboot.e.commerce.vo.CommonResponse; import com.alibaba.csp.sentinel.annotation.SentinelResource; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import java.util.Map; /** * 使用Sentinel保护RestTemplate服务调用 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/18 */ @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("/fallback") public class SentinelFallbackController { //注入普通 private final RestTemplate restTemplate; /** * remoteConsumer - * 容错降级:对于服务不可用时不能生效 * version: 1.0 - 2022/3/18 * @return {@link Map< String, Object> } */ @GetMapping("/remote/consumer") @SentinelResource( value = "remoteConsumerFallback", fallback = "remoteConsumerFallback", fallbackClass = { MyFallbackHandler.class } ) public CommonResponse remoteConsumer(){ String requestUrL = "http://localhost:8500/sentinel-client/rest/remote/producer"; log.info("RestTemplate request url [{}] ",requestUrL); return restTemplate.getForObject(requestUrL, CommonResponse.class); } /** * 让 Sentinel 忽略一些异常 * */ @GetMapping("/ignore-exception") @SentinelResource( value = "ignoreException", fallback = "ignoreExceptionFallback", fallbackClass = { MyFallbackHandler.class }, exceptionsToIgnore = { NullPointerException.class } ) public CommonResponse ignoreException(@RequestParam(defaultValue = "1") Integer code) { if (code % 2 == 0) { throw new NullPointerException("yout input code is: " + code); } else if ( code % 3 == 0){ throw new RuntimeException("yout input code is: " + code); } return CommonResponse.success(); } } ``` MyFallbackHandler ```java package cn.flowboot.e.commerce.fallback; import cn.flowboot.e.commerce.vo.CommonResponse; import lombok.extern.slf4j.Slf4j; /** * * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/18 */ @Slf4j public class MyFallbackHandler { /** * remoteConsumer 方法的 fallback * */ public static CommonResponse remoteConsumerFallback() { log.error("remote consumer service fallback"); return CommonResponse.fail("fallback"); } /** * ignoreException 方法的 fallback * */ public static CommonResponse ignoreExceptionFallback(Integer code) { log.error("ignore exception input code: [{}] has trigger exception", code); return CommonResponse.fail("ignoreExceptionFallback"); } } ``` 配置统一处理异常 RestTemplateExceptionHandler ```java package cn.flowboot.e.commerce.conf; import cn.flowboot.e.commerce.vo.CommonResponse; import com.alibaba.cloud.sentinel.rest.SentinelClientHttpResponse; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpRequest; import org.springframework.http.client.ClientHttpRequest; import org.springframework.http.client.ClientHttpRequestExecution; /** * RestTemplate 在限流或异常的兜底方法 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/18 */ @Slf4j public class RestTemplateExceptionHandler { /** * handleBlock - 限流处理方法 * version: 1.0 - 2022/3/18 * @param request 请求 * @param body 数据 * @param execution 请求链路 * @param ex 限流处理方法 * @return {@link SentinelClientHttpResponse } */ public static SentinelClientHttpResponse handleBlock(HttpRequest request, byte[] body, ClientHttpRequestExecution execution,BlockException ex){ log.error("handler restTemplate block exception : [{}], [{}]", request.getURI().getPath(),request.getClass().getCanonicalName()); return new SentinelClientHttpResponse( JSON.toJSONString(CommonResponse.fail("服务限流",request.getClass().getCanonicalName())) ); } /** * handleFallback - 异常处理方法 * version: 1.0 - 2022/3/18 * @param request 请求 * @param body 数据 * @param execution 请求链路 * @param ex 限流处理方法 * @return {@link SentinelClientHttpResponse } */ public static SentinelClientHttpResponse handleFallback(HttpRequest request, byte[] body, ClientHttpRequestExecution execution,BlockException ex){ log.error("handler restTemplate block exception : [{}], [{}]", request.getURI().getPath(),request.getClass().getCanonicalName()); return new SentinelClientHttpResponse( JSON.toJSONString(CommonResponse.fail("服务异常",request.getClass().getCanonicalName())) ); } } ``` SentinelConfig ```java package cn.flowboot.e.commerce.conf; import com.alibaba.cloud.sentinel.annotation.SentinelRestTemplate; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.client.RestTemplate; /** * 开启服务间的调用保护,需要给RestTemplate做一些包装 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/18 */ @Configuration public class SentinelConfig { /** * restTemplate - 包装RestTemplate * version: 1.0 - 2022/3/18 * @param * @return {@link RestTemplate } */ @Bean @SentinelRestTemplate( fallback = "handleFallback",fallbackClass = RestTemplateExceptionHandler.class, blockHandler = "handleBlock",blockHandlerClass = RestTemplateExceptionHandler.class ) public RestTemplate restTemplate(){ return new RestTemplate(); } } ``` SentinelRestTemplateController 使用统一配置的RestTemplate ```java package cn.flowboot.e.commerce.controller; import cn.flowboot.e.commerce.block.handler.MyBlockHandler; import cn.flowboot.e.commerce.dto.SearchGoodByIdsDto; import cn.flowboot.e.commerce.vo.CommonResponse; import com.alibaba.csp.sentinel.annotation.SentinelResource; import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 使用Sentinel保护RestTemplate服务调用 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/18 */ @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("/rest") public class SentinelRestTemplateController { private final RestTemplate restTemplate; /** * remoteConsumer - * 容错降级:对于服务不可用时不能生效 * version: 1.0 - 2022/3/18 * @return {@link Map< String, Object> } */ @GetMapping("/remote/consumer") public CommonResponse remoteConsumer(){ String requestUrL = "http://localhost:8500/sentinel-client/rest/remote/producer"; log.info("RestTemplate request url [{}] ",requestUrL); return restTemplate.getForObject(requestUrL, CommonResponse.class); } /** * 在 dashboard 中“流控规则”中按照资源名称新增流控规则 * @return */ @GetMapping("/remote/producer") public CommonResponse producer(){ log.info("coming in rate limit controller by resource"); return CommonResponse.success("producer"); } } ``` ### 11.3.3 Feign 降级 @SentinelResource 中fallback、fallbackClass指定异常降级的类和方法Sentinel还对 Feign 实现了适配,支持Feign的容错降级 ```yaml feign: sentinel: enabled: true ``` SentinelFeignController ```java package cn.flowboot.e.commerce.controller; import cn.flowboot.e.commerce.fallback.MyFallbackHandler; import cn.flowboot.e.commerce.feign.SentinelFeignClient; import cn.flowboot.e.commerce.vo.CommonResponse; import com.alibaba.csp.sentinel.annotation.SentinelResource; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import java.util.Map; /** * 使用Sentinel保护RestTemplate服务调用 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/18 */ @Slf4j @RequiredArgsConstructor @RestController @RequestMapping("/feign") public class SentinelFeignController { //注入普通 private final SentinelFeignClient sentinelFeignClient; /** * remoteConsumer - * 容错降级:对于服务不可用时不能生效 * version: 1.0 - 2022/3/18 * @return {@link Map< String, Object> } */ @GetMapping("/remote/consumer") public CommonResponse remoteConsumer(){ log.info("Sentinel feign client request "); return sentinelFeignClient.producer(); } } ``` SentinelFeignClient 其中微服务定义不存在会抛出异常 ```java package cn.flowboot.e.commerce.feign; import cn.flowboot.e.commerce.feign.fallback.SentinelFeignClientFallback; import cn.flowboot.e.commerce.vo.CommonResponse; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; /** * 通过 Sentinel 对 OpenFeign 实现熔断降级 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/18 */ @FeignClient( value = "e-commerce-sentinel-client-1", fallback = SentinelFeignClientFallback.class ) public interface SentinelFeignClient { /** * 在 dashboard 中“流控规则”中按照资源名称新增流控规则 * @return */ @GetMapping("/sentinel-client/rest/remote/producer") CommonResponse producer(); } ``` SentinelFeignClientFallback Sentinel 对 OpenFeign 接口的降级策略 ```java package cn.flowboot.e.commerce.feign.fallback; import cn.flowboot.e.commerce.feign.SentinelFeignClient; import cn.flowboot.e.commerce.vo.CommonResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * Sentinel 对 OpenFeign 接口的降级策略 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/18 */ @Slf4j @Component public class SentinelFeignClientFallback implements SentinelFeignClient { /** * 在 dashboard 中“流控规则”中按照资源名称新增流控规则 * * @return */ @Override public CommonResponse producer() { return CommonResponse.fail("服务错误"); } } ``` ## 11.4 Sentinel 存储 ### 11.4.1 SpringCloud Alibaba Sentinel 结合 Nacos - Sentinel Dashboard将规则保存在内存中,重启之后就会丢失,所以,考虑使用外部持久化方案 - 在Nacos中创建规则,Nacos会推送到客户端 - Sentinel Dashboard也会从Nacos 去获取配置信息 - Sentinel存储在 Nacos 中的限流数据结构  需要添加依赖(上述新建项目依赖包含) ```xml com.alibaba.csp sentinel-datasource-nacos ``` 配置 ```yaml spring: application: name: e-commerce-sentinel-client cloud: nacos: #服务发现 discovery: enabled: true server-addr: 127.0.0.1:8848 namespace: e-commerce-nacos-server metadata: management: context-path: ${server.servlet.context-path}/actuator sentinel: # 配置 sentinel dashboard 地址 transport: dashboard: 127.0.0.1:7777 port: 8719 # 会在应用对应的机器上启动一个 Http Server, 该 Server 会与 Sentinel 控制台做交互 datasource: # 名称任意, 代表数据源 ds: nacos: # NacosDataSourceProperties.java 中定义 server-addr: ${spring.cloud.nacos.discovery.server-addr} dataId: ${spring.application.name}-sentinel namespace: ${spring.cloud.nacos.discovery.namespace} groupId: DEFAULT_GROUP data-type: json # 规则类型: com.alibaba.cloud.sentinel.datasource.RuleType # FlowRule 就是限流规则 rule-type: flow # 服务启动直接建立心跳连接 eager: true # 开启或关闭 @SentinelRestTemplate resttemplate: sentinel: enabled: true feign: sentinel: enabled: true ``` nacos 中添加配置 e-commerce-sentinel-client-sentinel  ```json [ { "resource": "byResource", "limitApp": "default", "grade": 1, "count": 5, "strategy": 0, "controlBehavior": 0, "clusterMode": false } ] ``` ## 11.5 Gateway 结合 Sentinel ### 11.5.1 添加依赖 ```xml com.alibaba.cloud spring-cloud-starter-alibaba-sentinel com.alibaba.cloud spring-cloud-alibaba-sentinel-gateway com.alibaba.csp sentinel-datasource-nacos ``` ### 11.5.2 硬编码配置 ```java package cn.flowboot.e.commerce.config; import com.alibaba.csp.sentinel.adapter.gateway.common.SentinelGatewayConstants; import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition; import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPathPredicateItem; import com.alibaba.csp.sentinel.adapter.gateway.common.api.ApiPredicateItem; import com.alibaba.csp.sentinel.adapter.gateway.common.api.GatewayApiDefinitionManager; import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule; import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayRuleManager; import com.alibaba.csp.sentinel.adapter.gateway.sc.SentinelGatewayFilter; import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.BlockRequestHandler; import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager; import com.alibaba.csp.sentinel.adapter.gateway.sc.exception.SentinelGatewayBlockExceptionHandler; import com.alibaba.csp.sentinel.slots.block.RuleConstant; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.ObjectProvider; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerCodecConfigurer; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.server.ServerResponse; import org.springframework.web.reactive.result.view.ViewResolver; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import javax.annotation.PostConstruct; import java.util.*; /** * Gateway 集成 Sentinel 实现限流 * * @version 1.0 * @author: Vincent Vic * @since: 2022/03/18 */ @Slf4j @RequiredArgsConstructor @Configuration public class SentinelGatewayConfiguration { /** 视图解析器 */ private final List viewResolvers; /** HTTP 请求和响应数据的编解码配置 */ private final ServerCodecConfigurer serverCodecConfigurer; /** * 限流异常处理器, 限流异常出现时, 执行到这个 handler * */ @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() { // 默认会返回错误 message, code 429 return new SentinelGatewayBlockExceptionHandler( this.viewResolvers, this.serverCodecConfigurer ); } /** * 限流过滤器, 是 Gateway 全局过滤器, 优先级定义为最高 * */ @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public GlobalFilter sentinelGatewayFilter() { return new SentinelGatewayFilter(); } /** * 初始化限流规则 * */ @PostConstruct public void doInit() { log.info("---------------------------------------------------"); // 加载网关限流规则 log.info("load sentinel gateway rules (code define)"); initGatewayRules(); // 加载自定义限流异常处理器 initBlockHandler(); log.info("---------------------------------------------------"); } /** * 硬编码网关限流规则 * */ private void initGatewayRules() { Set rules = new HashSet<>(); GatewayFlowRule rule = new GatewayFlowRule(); // 指定限流模式, 根据 route_id 做限流, 默认的模式 rule.setResourceMode(SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID); // 指定 route_id -> service id rule.setResource("e-commerce-nacos-client"); // 按照 QPS 限流 rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // 统计窗口和限流阈值 rule.setIntervalSec(3); rule.setCount(1); rules.add(rule); // 加载到网关中 GatewayRuleManager.loadRules(rules); } /** * 自定义限流异常处理器 * */ private void initBlockHandler() { // 自定义 BlockRequestHandler BlockRequestHandler blockRequestHandler = new BlockRequestHandler() { @Override public Mono handleRequest(ServerWebExchange serverWebExchange, Throwable throwable) { log.error("------------- trigger gateway sentinel rule -------------"); Map result = new HashMap<>(); result.put("code", String.valueOf(HttpStatus.TOO_MANY_REQUESTS.value())); result.put("message", HttpStatus.TOO_MANY_REQUESTS.getReasonPhrase()); result.put("route", "e-commerce-nacos-client"); return ServerResponse .status(HttpStatus.TOO_MANY_REQUESTS) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromValue(result)); } }; // 设置自定义限流异常处理器 GatewayCallbackManager.setBlockHandler(blockRequestHandler); } } ``` SentinelGatewayConfiguration 修改为api分组 ```java public class SentinelGatewayConfiguration { // 相同代码略... /** * 硬编码网关限流规则 * */ private void initGatewayRules() { Set rules = new HashSet<>(); // GatewayFlowRule rule = new GatewayFlowRule(); // ... 注释之前单个规则 // rules.add(rule); // 限流分组, Sentinel 先去找规则定义, 再去找规则中定义的分组 rules.add( new GatewayFlowRule("nacos-client-api-1") .setCount(3).setIntervalSec(60) ); rules.add( new GatewayFlowRule("nacos-client-api-2") .setCount(1).setIntervalSec(60) ); // 加载到网关中 GatewayRuleManager.loadRules(rules); // 加载限流分组 initCustomizedApis(); } /** * 硬编码网关限流分组 * 1. 最大限制 - 演示 * 2. 具体的分组 * */ private void initCustomizedApis() { Set definitions = new HashSet<>(); // nacos-client-api 组, 最大的限制 ApiDefinition api = new ApiDefinition("nacos-client-api") .setPredicateItems(new HashSet() {{ // 模糊匹配 /ecommerce-nacos-client/ 及其子路径的所有请求 add(new ApiPathPredicateItem() .setPattern("/ecommerce-nacos-client/**") // 根据前缀匹配 .setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX)); }}); // nacos-client-api-1 分组 ApiDefinition api1 = new ApiDefinition("nacos-client-api-1") .setPredicateItems(new HashSet() {{ add(new ApiPathPredicateItem() // 精确匹配 /n/service/instance/e-commerce-nacos-client .setPattern("/s/nacos-client/search/service/instance")); }}); // nacos-client-api-2 分组 ApiDefinition api2 = new ApiDefinition("nacos-client-api-2") .setPredicateItems(new HashSet() {{ add(new ApiPathPredicateItem() // 精确匹配 /imooc/ecommerce-nacos-client/nacos-client/project-config .setPattern("/ecommerce-nacos-client" + "/nacos-client/project-config")); }}); definitions.add(api1); definitions.add(api2); // 加载限流分组 GatewayApiDefinitionManager.loadApiDefinitions(definitions); } } ``` ### 11.5.3 JSON文件配置方式 > 注释掉SentinelGatewayConfiguration中的@PostConstruct注解 在项目资源文件新建如下两个文件: gateway-flow-rule-api-sentinel.json ```json [ { "apiName": "nacos-client-api", "predicateItems": [ { "pattern": "/s/nacos-client/search/service/instance" }, { "pattern": "/n/service/instance/**", "matchStrategy": 1 } ] } ] ``` gateway-flow-rule-sentinel.json ```json [ { "resource": "e-commerce-nacos-client", "resourceMode": 0, "count": 3, "intervalSec": 60 }, { "resource": "nacos-client-api", "resourceMode": 1, "count": 1, "intervalSec": 60 } ] ``` > 注意:maven clean 当前网关子项目,防止文件在target不存在 通过本地文件方式 配置 ```yaml spring: cloud: sentinel: eager: true transport: port: 8720 dashboard: 127.0.0.1:7777 datasource: # 通过本地文件方式, 基于服务级别的配置 dsl.file: file: classpath:gateway-flow-rule-sentinel.json # 代表服务级别的限流, 一步步点进去看, 文件类型 ruleType: gw-flow # 通过本地文件方式, 细粒度对指定 api 进行配置 ds2.file: file: classpath:gateway-flow-rule-api-sentinel.json # 代表 API 分组, 一步步点进去看, 文件类型 ruleType: gw-api-group ``` ### 11.5.3 Nacos存储配置方式 ```yaml spring: cloud: sentinel: eager: true transport: port: 8720 dashboard: 127.0.0.1:7777 datasource: # 集成 Nacos ds1: nacos: server-addr: ${spring.cloud.nacos.discovery.server-addr} namespace: ${spring.cloud.nacos.discovery.namespace} # 测试时, 看看 Nacos 中修改是否能让 dashboard 生效, 就把第二个 count 也修改为 3 data-id: gateway-flow-rule-sentinel group-id: DEFAULT_GROUP data-type: json rule-type: gw-flow ds2: nacos: server-addr: ${spring.cloud.nacos.discovery.server-addr} namespace: ${spring.cloud.nacos.discovery.namespace} data-id: gateway-flow-rule-sentinel group-id: DEFAULT_GROUP data-type: json rule-type: gw-api-group ``` Nacos 配置添加 gateway-flow-rule-sentinel ```java [ { "resource": "e-commerce-nacos-client", "resourceMode": 0, "count": 3, "intervalSec": 60 }, { "resource": "nacos-client-api", "resourceMode": 1, "count": 1, "intervalSec": 60 } ] ``` gateway-flow-rule-sentinel ```java [ { "apiName": "nacos-client-api", "predicateItems": [ { "pattern": "/s/nacos-client/search/service/instance" }, { "pattern": "/n/service/instance/**", "matchStrategy": 1 } ] } ] ``` > 均为json,上述三种需要分开测试,还有其他存储方式 Spring Cloud Alibaba 占时只有这么多了 完整项目:[Gitee](https://gitee.com/Vincent-Vic/e-commerce-cloud),学习笔记,仅为组件学习,并没有完整项目