# CQRS-Demo **Repository Path**: com_developer/CQRS-Demo ## Basic Information - **Project Name**: CQRS-Demo - **Description**: CQRS的模拟demo - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2021-11-13 - **Last Updated**: 2021-11-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README *Martin Fowler* 我们不应该使用既能修改数据也能返回数据的方法,这样我们就有了两种类型的方法: 1. 查询:返回数据但不修改数据,因此没有副作用 2. 命令:修改数据但不返回数据 CQRS: Command Query Responsibility Segregation # 1. CRUD 围绕关系数据库构建而成的“创建、读取、更新、删除”系统(即CRUD系统),此类系统在一些业务逻辑简单的项目中可能没有什么问题,但是随着系统逻辑变得复杂,用户增多,这种设计就会出现一些性能问题。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20200806100325584.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTkxNTMxNA==,size_16,color_FFFFFF,t_70) # 2. CQRS 简单的说,**CQRS** 就是一个系统,从架构上把 CRUD 系统拆分为两部分:命令(Command)处理和查询(Query)处理。其中命令处理包括**增、删、改**。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20200806100506522.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTkxNTMxNA==,size_16,color_FFFFFF,t_70) 然后命令与查询两边可以用不同的架构实现,以实现CQ两端(即Command Side,简称C端;Query Side,简称Q端)的分别优化。两边所涉及到的实体对象也可以不同,从而继续演变成下面这样。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/202008061005451.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTkxNTMxNA==,size_16,color_FFFFFF,t_70) CQRS 强调的是 Command & Query 访问的数据模型不同,分别根据 Command & Query 需求的不同特性设计数据模型。比如 Command 更强调模型的范式化、完整性约束等。适用与查询的模型更强调性能,可以不过多地受范式的约束、又更多的数据冗余。或者 Command 用关系数据库,查询用NoSQL数据库。当然 Command & Query 使用同一个物理数据库,Query 使用View也是可以的,这是与读写分离的区别。 ## 2.1 CQRS 实现方式 ### 2.1.1 Command & Query 共享同一个数据库 两端数据库共享,只是在上层代码上分离。这样做的好处是可以让我们的代码读写分离,更容易维护,而且不存在 Command & Query 两端的数据一致性问题,因为是共享一个数据库的。 ### 2.1.2 Command & Query 数据库分离 两端不仅代码分离,数据库也分离,然后Q端数据由C端同步过来。同步方式有两种:**同步或异步**。 - 如果需要 CQ 两端数据的**强一致性**,则需要用**同步**; - 如果能接受 CQ 两端数据的**最终一致性**,则可以使用**异步**。 C端可以采用**Event Sourcing(简称ES)**模式,所有C端的最新数据全部用 Domain Event 表达即可;而要查询显示用的数据,则从Q端的DB查询即可。 ## 2.2 CQRS 适用于什么场景? 1. 应用的 **读模型** 和 **写模型** 差别比较大 2. 单一的存储模型无法同时满足**高性能**的读和写需求 ## 2.3 CQRS 带来的问题 ### 事务 保持 CQ 两端数据一致性有两种方式:**同步 & 异步**。 **同步** 在Command端,除了维护自身的 **写数据库** 以外,还需要维护 **读数据库**,并且可能需要维护读数据库中的多张表。(感觉还不如一开始的CRUD方式) **异步** 1. 数据完整性的实时性 2. 如何保证两个数据源之间的数据保持一致? ## 相关框架 - Axon # x. CQRS 示例模拟 模拟CQRS的运行机制,加深理解 ## 3.1 示例架构图 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20200806170058838.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTkxNTMxNA==,size_16,color_FFFFFF,t_70) 或者,看这个 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20200806153902399.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTkxNTMxNA==,size_16,color_FFFFFF,t_70) ## 3.2 代码 ### domain 作用:“数据库”的存储 ```java @Data @RequiredArgsConstructor public class User { @NonNull private String userId; @NonNull private String firstName; @NonNull private String lastName; private Set contacts = new HashSet<>(); private Set
addresses = new HashSet<>(); } ``` ```java @Data @AllArgsConstructor @NoArgsConstructor public class Address { private String city; private String state; private String postcode; } ``` ```java @Data @AllArgsConstructor @NoArgsConstructor public class Contact { private String type; private String detail; } ``` ```java @Data public class UserAddress { private Map> addressByRegion = new HashMap<>(); } ``` ```java @Data public class UserContact { private Map> contactByType = new HashMap<>(); } ``` ### command 作用:UI发起的增、删、改请求命令的对象 ```java @Data @AllArgsConstructor public class CreateUserCommand { private String userId; private String firstName; private String lastName; } ``` ```java @Data @AllArgsConstructor public class UpdateUserCommand { private String userId; private Set
addresses; private Set contacts; } ``` ### Query 作用:UI发起的查询请求命令的对象 ```java @Data @AllArgsConstructor public class AddressByRegionQuery { private String userId; private String state; } ``` ```java @Data @AllArgsConstructor public class ContactByTypeQuery { private String userId; private String contactType; } ``` ### Repository 作用:操作数据库 ```java public class UserReadRepository { private Map userAddress = new HashMap<>(); private Map userContact = new HashMap<>(); public void addUserAddress(String id, UserAddress user) { userAddress.put(id, user); } public UserAddress getUserAddress(String id) { return userAddress.get(id); } public void addUserContact(String id, UserContact user) { userContact.put(id, user); } public UserContact getUserContact(String id) { return userContact.get(id); } } ``` ```java public class UserWriteRepository { private Map store = new HashMap<>(); public void addUser(String id, User user) { store.put(id, user); } public User getUser(String id) { return store.get(id); } } ``` ### Aggregate 作用:处理增、删、改的请求命令 ```java public class UserAggregate { private UserWriteRepository writeRepository; public UserAggregate(UserWriteRepository repository) { this.writeRepository = repository; } public User handleCreateUserCommand(CreateUserCommand command) { User user = new User(command.getUserId(), command.getFirstName(), command.getLastName()); writeRepository.addUser(user.getUserId(), user); return user; } public User handleUpdateUserCommand(UpdateUserCommand command) { User user = writeRepository.getUser(command.getUserId()); user.setAddresses(command.getAddresses()); user.setContacts(command.getContacts()); writeRepository.addUser(user.getUserId(), user); return user; } } ``` ### Projection 作用:处理查询的请求命令 ```java public class UserProjection { private UserReadRepository readRepository; public UserProjection(UserReadRepository readRepository) { this.readRepository = readRepository; } public Set handle(ContactByTypeQuery query) { UserContact userContact = readRepository.getUserContact(query.getUserId()); return userContact.getContactByType() .get(query.getContactType()); } public Set
handle(AddressByRegionQuery query) { UserAddress userAddress = readRepository.getUserAddress(query.getUserId()); return userAddress.getAddressByRegion() .get(query.getState()); } } ``` ### Projector 作用:同步 写数据库 至 读数据库 ```java public class UserProjection { private UserReadRepository readRepository; public UserProjection(UserReadRepository readRepository) { this.readRepository = readRepository; } public Set handle(ContactByTypeQuery query) { UserContact userContact = readRepository.getUserContact(query.getUserId()); return userContact.getContactByType() .get(query.getContactType()); } public Set
handle(AddressByRegionQuery query) { UserAddress userAddress = readRepository.getUserAddress(query.getUserId()); return userAddress.getAddressByRegion() .get(query.getState()); } } ``` ### 测试 ```java @SpringBootTest class DemoApplicationTests { private UserWriteRepository writeRepository = new UserWriteRepository(); private UserReadRepository readRepository = new UserReadRepository(); private UserProjector projector = new UserProjector(readRepository); private UserAggregate userAggregate = new UserAggregate(writeRepository); private UserProjection userProjection = new UserProjection(readRepository); @Test public void givenCQRSApplication_whenCommandRun_thenQueryShouldReturnResult() throws Exception { // 1. 初始化userId String userId = UUID.randomUUID().toString(); // 2. 模拟创建用户对应的命令 CreateUserCommand createUserCommand = new CreateUserCommand(userId, "Tom", "Sawyer"); // 3. 聚合器处理创建用户命令,往“写数据库”写入数据。对应的实体类是:User.java User user = userAggregate.handleCreateUserCommand(createUserCommand); // 4. 将实体类User.java,以另一种方式存入“读数据库”中。对应的实体类是:UserContact.java & UserAddress.java projector.project(user); // 5. 模拟修改用户对应的命令:添加用户的地址 & 联系方式 UpdateUserCommand updateUserCommand = new UpdateUserCommand(user.getUserId(), Stream.of(new Address("New York", "NY", "10001"), new Address("Los Angeles", "CA", "90001")).collect(Collectors.toSet()), Stream.of(new Contact("EMAIL", "tom.sawyer@gmail.com"), new Contact("EMAIL", "tom.sawyer@rediff.com")).collect(Collectors.toSet())); // 6. 聚合器处理修改用户命令,往“写数据库”修改 & 写数据。对应的实体类是:User.java user = userAggregate.handleUpdateUserCommand(updateUserCommand); // 7. 将实体类User.java,以另一种方式存入“读数据库”中。对应的实体类是:UserContact.java & UserAddress.java projector.project(user); // 8. 模拟修改用户对应的命令:添加用户的地址 & 联系方式 updateUserCommand = new UpdateUserCommand(userId, Stream.of(new Address("New York", "NY", "10001"), new Address("Housten", "TX", "77001")).collect(Collectors.toSet()), Stream.of(new Contact("EMAIL", "tom.sawyer@gmail.com"), new Contact("PHONE", "700-000-0001")).collect(Collectors.toSet())); // 9. 聚合器处理修改用户命令,往“写数据库”修改 & 写数据。对应的实体类是:User.java user = userAggregate.handleUpdateUserCommand(updateUserCommand); // 10. 将实体类User.java,以另一种方式存入“读数据库”中。对应的实体类是:UserContact.java & UserAddress.java projector.project(user); // 11. 发起查询命令 ContactByTypeQuery contactByTypeQuery = new ContactByTypeQuery(userId, "EMAIL"); assertEquals(Stream.of(new Contact("EMAIL", "tom.sawyer@gmail.com")).collect(Collectors.toSet()), userProjection.handle(contactByTypeQuery)); AddressByRegionQuery addressByRegionQuery = new AddressByRegionQuery(userId, "NY"); assertEquals(Stream.of(new Address("New York", "NY", "10001")).collect(Collectors.toSet()), userProjection.handle(addressByRegionQuery)); } } ``` ### 代码来源 [eugenp/tutorials](https://github.com/eugenp/tutorials/tree/master/patterns/cqrs-es) # 参考资料 [程序员除了会 CRUD 之外,还应该知道什么叫 CQRS!](https://baijiahao.baidu.com/s?id=1640387045132096546&wfr=spider&for=pc) [面向企业级 Web 开发的 CQRS:它能为业务带来什么价值?](https://www.infoq.cn/article/cqrs-business-kaminski/?utm_source=tuicool&utm_medium=referral) [CQRS 架构](https://blog.csdn.net/hustspy1990/article/details/97526890) [Java的CQRS和事件溯源ES入门:如何从CRUD切换到CQRS/ES - Baeldung](https://www.jdon.com/54180) [DDD 中的那些模式 — CQRS](https://zhuanlan.zhihu.com/p/115685384)