# t-rpc-framework
**Repository Path**: tmq777/t-rpc-framework
## Basic Information
- **Project Name**: t-rpc-framework
- **Description**: 基于netty的rpc框架
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 10
- **Forks**: 4
- **Created**: 2022-01-06
- **Last Updated**: 2024-12-10
## Categories & Tags
**Categories**: rpc
**Tags**: Netty, SpringBoot
## README
# `t-rpc-framework`
## 开发环境
1. `jdk 11.0.6`
2. `springboot 2.6.2`
3. `netty-all 4.1.72.Final`
4. `zookeeper 3.7.0`
---
## 快速开始
### 框架可设置属性参考(`application.yml`)
```yaml
rpc:
service:
serverId: app # 分布式服务ID(多实例时应该指定同一个ID)
serviceType: NONE # 服务类型枚举: NONE|CLIENT|SERVER|BOTH
port: 18081 # 对外提供服务的端口
connectionTimeout: 5000 # 客户端连接的超时时间(毫秒)
requestTimeout: 2000 # 服务器响应生成到客户端接收到的时间差值阈值,每次请求的超时时间(毫秒)
retryCnt: 5 # 连接池取不到连接时,标记一次失败,失败次数达到指定值时,连接池将会被标记为不可用
idleCnt: 5 # 心跳检测次数(达到该次数将会断开通道(重新请求时会重新建立通道))
idleInterval: 60 # 心跳检测时间间隔(秒)
maxPoolConnection: 5 # 客户端单个连接池最大通道数
reconnectionTime: 30 # 连接池不可用后进行重连的最大时间(分)。不设置或者为-1时默认无限重试(至少会重试一次,所以设置30,实际上会重连31分钟), 尽量不要设置无限重试
zkCenter: 127.0.0.1:2181 # 如果需要使用zookeeper,在此指定ip并且将框架自带的zk发现策略注册为bean
```
### 引入工程并启动
1. 代码`clone`下来之后执行`maven`的`clean install`打包,然后引入自己的工程。[配置框架](#框架可设置属性参考(`application.yml`))后即可使用。
或者直接引入发行版然后[配置框架](#框架可设置属性参考(`application.yml`))
> 发行版地址: https://gitee.com/tmq777/t-rpc-framework/releases/
```xml
cn.t.rpc
t-rpc-spring-boot-starter
1.0.0
```
2. ##### 在`SpringBoot`启动类或者任意`@Configuration`标识的配置类上添加`@TRpcServiceScanner`注解,同时指定远程服务的接口包地址,如下:
```java
// 例如
@TRpcServiceScanner(basePackages = {"cn.t.rpc"})
public class XXXConfiguration {
// ...
}
```
3. ##### 在上述指定包路径下(支持多层扫描)创建需要调用的远程服务接口,同时指定`@TRpcRemoteService`注解,如下:
```JAVA
@TRpcRemoteService(serverId = "app", className = "ServerTimeService")
public interface ServerTimeService {
// 接口中的方法需要和远程服务的方法名一致
String getServerTime(String user);
}
```
> 说明: 注解中的`serverId`为必须值,该值为远程服务的服务ID,即基础值配置中的`rpc.service.serverId`的值;`className`非必须,该值未设置时使用当前接口名作为值传递。
>
> 注意:如果未设置`className`,那么远程服务(下文说明)的`className`属性也必须为当前接口名。
4. ##### 如果本机需要发布服务,那么`rpc.service.serviceType`值需要设置为`SERVER`或者`BOTH`。同时在需要发布为服务的类上标识`@TRpcLocalService`注解,如下:
```java
@TRpcLocalService(className = "ServerTimeService")
public class ServerTimeService {
private static final String FORMAT = "yyyy年MM月dd日 hh时mm分ss秒";
/**
* 获取当前服务器的时间
* @param user
* @return 服务时间
*/
public String getServerTime(String user) {
return "Hello!" + user + "现在是:" + new SimpleDateFormat(FORMAT).format(new Date());
}
}
```
> 说明:`@TRpcLocalService`标识的类需要为具体的类而不是接口。注解中的`className`属性为非必须,默认为空时,使用当前类名作为实际值,客户端调用时指定对应的值即可
5. ##### 使用zookeeper作为注册中心时,设置`zkCenter`属性即可,否则会默认使用`properties`文件作为服务信息来源,文件格式参照[客户端服务发现策略](#客户端服务发现策略)。
6. ##### 客户端在需要调用的地方注入接口类型,然后正常使用即可。
```java
@RestController
@RequestMapping("/t-rpc")
public class WebController {
// 和普通Bean一样注入
@Autowired
private ServerTimeService test;
// 和普通Bean一样使用
@RequestMapping("/test")
public String index() {
return test.getServerTime("TMQ");
}
}
```
---
## 核心点介绍
基于`netty`的`rpc`框架,基于注解生成接口的代理类,进行远程RPC调用。
1. 搭配`SpringBoot`启用,启动时会自动将标识了`@TRpcRemoteService(serverId= "serverId")`的**接口**生成代理类,在**需要的位置依赖注入**即可。
2. 客户端使用`ChannelPoolMap`关系型连接池,一个连接池对应一个服务器。单个连接池的连接数**可配置**。相同服务ID的连接池会被聚合,调用时会进行**随机轮询**。
3. 每个连接池获取通道时提供了循环重试的功能,**配置的重试次数**超过指定值后会开启连接池重连机制。
4. 连接池开始重连后会触发至少一次的重连服务器的操作,可配置重连的**最大上限时间**。默认30分钟(间隔一分钟)。重连超过最大上限时间后连接池将会进入`Down`状态。进入`重连`和`Down`的连接池将不会被使用。
5. 连接池`DOWN`掉之后会关闭所占用的资源,但是上下文中还是会存在这个对象,连接池辅助类中在初始化时会开启定时任务,30分钟后开始,每30分钟清除一次`DOWN`掉的连接池,同时清除操作也会在**刷新池子**的操作中完成。
6. 客户端的`socket`连接为**长连接**,连接可配置**心跳检测时间**,心跳超时则会关闭通道。
7. 使用策略模式指定**序列化方式**(目前默认使用`jackson`的`Json`序列化方式)。
8. 使用策略模式指定**服务发现方式**(已提供`zookeeper`的服务发现策略,需要手动注册`bean`。基于`zookeeper`的策略已经实现了服务动态刷新功能)。
9. 根据约定的规则进行数据传输,解决沾包拆包问题。
10. 默认使用`GZIP`进行数据流压缩,提升传输效率。
11. 客户端使用`ConcurrentHashMap`缓存响应结果,根据本次通信的`key`进行隔离,为防止超时以及其他情况导致缓存的内存膨胀溢出,使用了**可配置的超时机制**,代理类获取响应时会借助线程池的`Future`返回对象进行**默认2秒**的超时等待获取,同时通信处理器中会做如下处理进行安全保证:
1. 通信数据中带有毫秒值时间戳,客户端响应事件中会进行判断,若响应事件超过**配置时间**则丢弃响应。否则按**通信key存入缓存**。
2. 代理类通过`Future`等待取值,给定时间(配置的超时时间加上50毫秒的传输时间)内获取成功后,移除缓存并返回响应。如果给定时间内没有响应成功,将本次`通信key`从缓存中删除并根据特殊情况存入**延时队列**,默认5秒后进行消费再次清除。
> 通过上述处理,理论上缓存中不会存在无效数据
12. 在进行`RPC`调用时,如果调用方的参数包含自定义`POJO`类,那么被远程调用方经过解码后该类型会变成`LinkedHashMap`,简言之就是如果对应的类型找不到,会被自动转换成`LinkedHashMap`,该功能依赖于`jackson`的包。在**自定义**序列化策略时需要**着重注意**这一点。
举例:
- A需要调用B服务,此时A传递的参数中有一个自定义类但是B部署的环境中没有这个类。
```java
// A部署的环境中调用B服务
bService.index(Any param);
```
此时,B能够正确接收到参数,但是参数`param`会被转换为`LinkedHashMap`。`key-value = 属性名-值`
- B服务给A返回响应时,返回值中带了A环境中不存在的类型,那么A接收响应有两种办法,1是使用Map接收,2是使用对象中属性值和返回响应体一致的对象接收。
```java
// 例如 B 返回了一个对象包含 {name: "a", age: 16}的响应
// 那么回到A中时,A可以使用 任意 属性包含name 和 age 的自定义类型来接收,或者直接使用Map
CustomType res = bService.index(Any param);
// 自定义类型
class CustomType {
private String name;
private int age;
}
```
> 键值对一致时可以自由选择如何接收
>
> 使用自定义策略化的序列化方式时需要注意这块功能的实现
---
## 核心依赖
```XML
io.netty
netty-all
com.fasterxml.jackson.core
jackson-core
com.fasterxml.jackson.core
jackson-databind
com.fasterxml.jackson.core
jackson-annotations
org.apache.zookeeper
zookeeper
3.7.0
```
---
## 核心代码说明
### 自动配置类
`cn.t.rpc.config.RpcAutoConfiguration`
```java
@Configuration
// @TRpcServiceScanner(basePackages = {"cn.t.rpc"}) // ※注意点1
public class RpcAutoConfiguration {
/**
* 客户端配置类
* @return 客户端配置类
*/
@Bean
@ConfigurationProperties(prefix = "rpc.service")
public RpcConfigProperties rpcConfigProperties() {
RpcConfigProperties properties = new RpcConfigProperties();
properties.setSerializationStrategy(serializationStrategy());
return properties;
}
/**
* 容器后处理
* @return
*/
@Bean
public TRpcFrameworkAware tRpcFrameworkAware() {
return new TRpcFrameworkAware();
}
/**
* 序列化方式工具类
* 默认使用Jackson的Json工具类
* @return 序列化方式工具类
*/
@Bean
@ConditionalOnMissingBean(SerializationStrategy.class)
public SerializationStrategy serializationStrategy() {
return new SerializationByJson();
}
/**
* 服务发现
* @return 服务发现Bean
*/
@Bean
@ConditionalOnMissingBean(TRpcDiscoveryStrategy.class)
public TRpcDiscoveryStrategy rpcDiscovery() {
return new TRpcDiscoveryByProperties();
}
}
```
自动配置类中默认装配了整个框架运行所必须的**配置信息对象**`TRpcConfigProperties`,同时默认配置了**Json方式序列化**以及从**内存中读取服务列表**的策略。
**需要获取框架定义的配置项时注入**`TRpcConfigProperties`即可。
需要自定义序列化方式时,实现`SerializationStrategy`接口并生成`Bean`即可。
需要自定义服务发现方式时,实现`TRpcDiscoveryStrategy`接口并生成`Bean`即可。
**注意点1:代码中注释掉的**`@TRpcServiceScanner(basePackages = {"cn.t.rpc"})`**需要在自己的配置类中添加并指定包路径,否则无法扫描到需要代理的接口!!!**
> `resources\META-INF\spring.factories`路径下已经配置了自动装配入口
>
> ```
> org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
> cn.t.rpc.config.RpcAutoConfiguration
> ```
### 约定的数据类型以及格式(编码器解码器)
传输约定的默认数据类型为`Json`,序列化的基类为`cn.t.rpc.core.TrafficData`,默认派生三个子类
1. `TRequest` 请求的数据类
2. `TResponse` 响应的数据类
3. `TIdle` 心跳检测用的数据类
> 用于数据传输的数据类中的字段名已经在保证语义的情况下尽量缩短了,以减少序列化传输时的数据量,提升通信性能。
传输约定的数据格式如下:
1. 序列化的数据类型(用于区分核心功能以及后续扩展功能)
2. 有效业务数据的字节长度
3. 当前传输的数据类型(请求还是响应,该字段目前没有实际业务意义)
4. 业务数据
5. 终止符
举例(以竖线为分隔符,0代表各个字节):
```java
0000000|0000|0000|00000...000000|000
```
编码器如下(只贴出核心部分):
```java
@Override
protected void encode(ChannelHandlerContext ctx, TrafficData msg, ByteBuf out) throws Exception {
try {
// do something
byte[] data = this.strategy.serialize(msg);
// do something
int len = data.length;
// 写入序列化类型-占类型字符串对应的byte数组长度
out.writeBytes(TRpcConstants.RPC_TRAFFIC_TYPE.getBytes(CharsetUtil.UTF_8));
// 写入长度-占4位
out.writeInt(len);
// 写入类型-占4位
out.writeInt(msg.getType().value());
// 写入数据
out.writeBytes(data);
// 写入终止符
out.writeBytes(TRpcConstants.RPC_TRAFFIC_EOF.getBytes(CharsetUtil.UTF_8));
} catch(Exception e) {
logger.error("TrafficEncoder: 编码异常...本次连接关闭");
logger.error(e.getMessage());
ctx.close();
}
}
```
解码器如下(只贴出核心部分):
```java
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List