# netty-client-connect-pool **Repository Path**: voidwx/netty-client-connect-pool ## Basic Information - **Project Name**: netty-client-connect-pool - **Description**: netty客户端连接池 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 8 - **Forks**: 4 - **Created**: 2022-01-13 - **Last Updated**: 2024-08-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 一、引言 ## 1.1 场景 最近存在一个如下的场景需求。本人需要实现一个雾服务器,其主要功能是接收安卓端的Https请求,然后将该请求中的数据转发给对应的后台认证服务。 其中,在安卓和雾服务器间使用https是因为需要传输`userID`、`asessionKey`和`token`等可能的敏感信息,https使用nginx代理实现。雾服务器和后台认证服务在一台服务器上,所以这两者之间直接使用socket进行通信。 场景需求 ## 1.2 技术选型 安卓->雾服务器:springboot 雾服务器->后台认证服务(连接池):netty(指定) 此处通信使用载体的都为**JSON** ## 1.3 声明 本文在[Netty Client实战——高并发连接池方案_itboyer的博客-CSDN博客_netty客户端连接池](https://blog.csdn.net/zd161580/article/details/89678383)的基础上更改了部分内容以及添加了注释。由于本人的水平有限,代码可能不够简洁高效,也存在部分问题未解决,请见谅。 # 二、线程模型 ![](https://voidewx-blog.oss-cn-beijing.aliyuncs.com/img/pasted-46.png) 描述:当有任务需要连接服务器时,会新建一个线程从连接池中获取一个连接。 ## 2.1 连接池中连接的获取与使用 因为在这里我们需要服务端返回的结果,所以线程池中的线程是以实现`Callable`接口实现的。 这里引用原博客中的问题: > Netty提供了异步IO和同步IO的统一实现,但是我们的需求其实和IO的同步异步并无关系。我们的关键是要实现请求-响应这种典型的一问一答交互方式。用于实现微服务之间的调用和返回结果获取,要实现这个需求,需要解决两个问题: > > a. 请求和响应的正确匹配。 > > 当服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢?解决方式:通过客户端唯一的RequestId,服务端返回的响应中需要包含该RequestId,这样客户端就可以通过RequestId来正确匹配请求响应。(在本文中,使用randomId来完成请求和响应的正确匹配) > > b. 请求线程和响应线程的通信。 > > 因为请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty在接受到响应之后,怎么通知请求线程结果。(此部分在`NettyClientHandler`中由RESULT_MAP来控制实现) ### 2.1.1 `ChannelTaskThread.java` 子线程,通过目的服务器地址和全局随机数从连接池中获取对应服务器的channel(`nettyClientPool.getChannel(random,socketAddress);`),这里的随机数是要做为channel的属性来实现标识。 ```java /** * 多线程获取连接池中的连接 * Callable在任务完成之后会有返回值 * Callable:一个具有类型参数的泛型 */ public class ChannelTaskThread implements Callable { // 获取netty连接池,NettyClientPool为单例模式,可以获取全局唯一的连接池 private final Logger logger = LoggerFactory.getLogger(getClass()); final NettyClientPool nettyClientPool = NettyClientPool.getInstance(); private String message; private InetSocketAddress socketAddress; public ChannelTaskThread(String message, InetSocketAddress socketAddress) { this.message = message; this.socketAddress = socketAddress; } @Override public String call() throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS"); // 同一个线程使用同一个全局唯一的随机数 long random = Long.parseLong(sdf.format(new Date())) * 1000000 + Math.round(Math.random()*1000000); Channel channel = nettyClientPool.getChannel(random,socketAddress); logger.info("在链接池中取到的Channel:{}",channel.id()); // UnpooledByteBufAllocator: 非池化的内存分配器,用于从堆上或直接内存上进行内存的分配和释放;false:在堆上开启buffer UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false); ByteBuf buf = allocator.buffer(20); String msg = message; // 先写长度,之后再读取长度 byte[] bytes = msg.getBytes(); buf.writeInt(bytes.length); buf.writeBytes(msg.getBytes()); // 根据特定的handler类型返回对应的handler NettyClientHandler tcpHandler = channel.pipeline().get(NettyClientHandler.class); ChannelId id = channel.id(); logger.info("SEND SEQNO[{}] MESSAGE AND CHANNEL id [{}]",random,id); // 这里的serverMsg就是返回的结果 String serverMsg = tcpHandler.sendMessage(buf,channel); // 释放连接 NettyClientPool.release(channel,socketAddress); logger.info("接受到返回值:{}",serverMsg); return serverMsg; } } ``` ### 2.1.2 `NettyClientHandler.java` **1、请求和响应的匹配** channel会传输服务器的返回值,因为我们可以通过channel的全局随机数属性randomId来完成请求与响应的正确匹配。具体的,在获取到channel执行任务时`sendMessage`,为每个channel建立一个阻塞队列`linked`,并将相关信息添加到`RESULT_MAP`,`sendMessage`会不断尝试从`linked`中获取值,当channel还没有返回值,`sendMessage`会一直阻塞。当存在返回值,会被`channelRead()`读取,此时`channelRead`会根据channel的randomId将返回值放入对应的阻塞队列`linked`,此时由于`linked`存在值了,`sendMessage`可以取到值并返回对应请求的响应。这里的思想可以简化成一个消费者-生产者问题,消费者`sendMessage`只有当`channelRead`往阻塞队列中添加值才能继续运行,否则将一直阻塞。 总的来说,这里利用**全局随机值**实现了请求和响应的匹配。 **2、对于通道的回收** 对于Netty连接池来说,不需要维护过多的连接,因此,**当通道A空闲**时,可以判断当前连接池的活跃连接数是否大于预设值,如果大于预设值,则将通道A回收。那么如何统计不同连接池的活跃连接数呢? 在`NettyClientHandler`中建立一个Map来存储对应连接池活跃的连接数`volatile static Map> coreChannel = new HashMap<>();`。然后在检查通道空闲状态时,将不同连接池的通道分别添加进该map,这样实现了对不同连接池活跃连接数的统计。 但是这样存在一个问题:提前完成任务的channel会一直被保留。 **3、心跳消息的处理** 连接池会保持着多个与服务器连接的channel,如果服务器对每个心跳消息都做回应,会造成通信资源的浪费,因此在这里的设计是服务器会对接受到的消息字段进行判断,如果发现是心跳机制将不会回复消息。 ```java public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 使用阻塞式LinkedBlockingQueue,对应响应结果保存,并发安全,响应结果为String * 用于记录通道响应的结果集合 */ private static final Map> RESULT_MAP = new ConcurrentHashMap<>(); volatile static Map> coreChannel = new HashMap<>(); private final Logger logger = LoggerFactory.getLogger(getClass()); public String sendMessage(ByteBuf message, Channel ch){ // 容量为1的阻塞队列 LinkedBlockingDeque linked = new LinkedBlockingDeque<>(1); // 获取channel中存储的全局唯一随机值 Long randomId = ch.attr(AttributeKey.valueOf(DataBusConstant.RANDOM_KEY)).get(); RESULT_MAP.put(randomId,linked); // 发送message ch.writeAndFlush(message); String res = null; try { // 设置3分钟的获取超时时间或使用take()---获取不到返回结果则一直阻塞 res = RESULT_MAP.get(randomId).poll(3, TimeUnit.MINUTES); RESULT_MAP.remove(randomId); }catch (Exception e){ e.printStackTrace(); } return res; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.debug("into channelRead"); String message = null; if (msg instanceof String){ message = msg.toString(); }else if (msg instanceof ByteBuf){ message = ((ByteBuf)msg).toString(Charset.defaultCharset()); } // 获取channel中存储的全局唯一随机值 Long randomId = ctx.channel().attr(AttributeKey.valueOf(DataBusConstant.RANDOM_KEY)).get(); // 替换为log logger.info("READ INFO 服务端返回结果:{}", message); // 将服务端返回结果返回对应的channel中 LinkedBlockingDeque linked = RESULT_MAP.get(randomId); if (message != null){ linked.add(message); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { boolean active = ctx.channel().isActive(); logger.debug("[此时通道状态]{}",active); } /** * 心跳机制实现连接的动态回收 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { logger.info("[客户端心跳监测发送] 通道编号:{}",ctx.channel().id()); Channel channel = ctx.channel(); if (evt instanceof IdleStateEvent){ // 当客户端开始发送心跳检测时,说明没有业务请求,释放通道数设定的CORE_CONNECTIONS if (channel.isActive()){ // 使用pool的hash作为key,维护CORE_CONNECTIONS个通道数,多余关闭 int poolHash = NettyClientPool.getPoolHash(channel); // 获取poolHash对应的连接集合 Set channels = coreChannel.get(poolHash); channels = channels == null ? new HashSet<>(DataBusConstant.CORE_CONNECTIONS) : channels; channels.add(channel); if (channels.stream().filter(Channel::isActive).count() > DataBusConstant.CORE_CONNECTIONS){ logger.info("关闭 CORE_CONNECTIONS 范围之外的通道:{}",channel.id()); channels.remove(channel); channel.close(); } // 将更新后的连接集合到coreChannel中 coreChannel.put(poolHash,channels); } String heartBeat = DataBusConstant.HEART_BEAT; byte[] bytes = heartBeat.getBytes(); UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false); ByteBuf buf = allocator.buffer(20); buf.writeInt(bytes.length); buf.writeBytes(bytes); channel.writeAndFlush(buf); } else { super.userEventTriggered(ctx,evt); } } } ``` ## 2.2 连接池的创建 官方提供的`FixedChannelPool`支持固定连接的连接池,但是不支持连接池的动态回收,通道的动态回收结合心跳机制实现(见上): ### 2.2.1 `NettyClientPool.java` 这里需要对Line50处的代码进行说明: ```java getInetAddresses(address); // poolMap.get(key)方法会对不存在的key值创建一个新的channelPool // 为对应的IP+PORT创建channelPool for (InetSocketAddress address : addressList){ pools.put(address,poolMap.get(address)); } } ``` 在此时,代码中的`poolMap`并没有进行初始化,其实现的是`ChannelPoolMap`接口,其`get(key)`方法,如果key不存在,则会新建一个`FixedChannelPool`,在这里即执行**line46**的语句。 ```java public class NettyClientPool { // volatile用来确保将变量的更新操作通知到其他线程。 volatile private static NettyClientPool nettyClientPool; private final Logger logger = LoggerFactory.getLogger(getClass()); /** * key为目标主机的InetSocketAddress对象,value为目标主机对应的连接池 * InetSocketAddress可以为ip+port,也可以为hostname+port * FixedChannelPool:ChannelPool,可以强制保持一个最大的连接并发 */ public ChannelPoolMap poolMap; final EventLoopGroup group = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); private static final String address = "127.0.0.1:8000,127.0.0.1:7000,127.0.0.1:9000"; // 注意这里的pools是private,poolMap应该是用来跟pools进行信息更新 volatile private static Map pools = new HashMap<>(3); // 注意此处使用了volatile 进行了隔离 volatile private static List addressList; private NettyClientPool(){ build(); } public static NettyClientPool getInstance(){ if (nettyClientPool == null){ // 同步操作,即加锁 synchronized (NettyClientPool.class){ // 为了避免多次初始化,此处又重新做了一次null值判断 if (nettyClientPool == null){ nettyClientPool = new NettyClientPool(); } } } return nettyClientPool; } public void build(){ logger.info("NettyClientPool build..."); bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true) .option(ChannelOption.SO_KEEPALIVE,true); poolMap = new AbstractChannelPoolMap() { @Override protected FixedChannelPool newPool(InetSocketAddress key) { // DataBusConstant.MAX_CONNECTIONS 最大连接数 // bootstrap.remoteAddress(key):bootstrap对remoteKey进行连接 return new FixedChannelPool(bootstrap.remoteAddress(key),new NettyChannelPoolHandler(), DataBusConstant.MAX_CONNECTIONS); } }; // 获取server段的addressList,此处的address应该是多个服务器信息的连写,如"127.0.0.1:80,127.0.0.1:90" getInetAddresses(address); // poolMap.get(key)方法会对不存在的key值创建一个新的channelPool // 为对应的IP+PORT创建channelPool for (InetSocketAddress address : addressList){ pools.put(address,poolMap.get(address)); } } /** * 功能描述: * 根据随机数取出的server对应pool,从pool中取出channel * 连接池的动态扩容: 指定最大连接数为Integer.MAX_VALUE,如果连接池队列中取不到channel,会自动创建channel,默认使用FIFO的获取方式,回收的channel优先被再次get到 * SERVER的宕机自动切换: 指定重试次数,get()发生连接异常,会重新获取 */ public Channel getChannel(long random, InetSocketAddress address){ int retry = 0; Channel channel = null; try { // random是一个关于时间戳的随机数 // 根据address获取对应的连接池 FixedChannelPool pool = pools.get(address); // 从连接池获取连接 Future future = pool.acquire(); channel = future.get(); // 为channel设置key:random随机数 AttributeKey randomID = AttributeKey.valueOf(DataBusConstant.RANDOM_KEY); channel.attr(randomID).set(random); } catch (ExecutionException e) { //如果是因为服务端挂掉,连接失败而获取不到channel,则随机数执行+1操作,从下一个池获取 logger.info(e.getMessage()); // 每个池子尝试获取2次 int count = 2; if(retry < addressList.size() * count){ retry++; return getChannel( random,address); } else { logger.info("没有可以获取到channel连接的server,server list [{}]",addressList); throw new RuntimeException("没有可以获取到channel连接的server"); } } catch (Exception e) { e.printStackTrace(); } return channel; } /** * 回收channel进池,需要保证随机值和getChannel获取到的随机值是同一个,才能从同一个pool中释放资源 * @param ch */ public static void release(Channel ch,InetSocketAddress socketAddress){ long random = ch.attr(AttributeKey.valueOf(DataBusConstant.RANDOM_KEY)).get(); ch.flush(); pools.get(socketAddress).release(ch); } /** * 获取线程池的hash值 */ public static int getPoolHash(Channel ch){ // 获取random随机值 long random = ch.attr(AttributeKey.valueOf(DataBusConstant.RANDOM_KEY)).get(); InetSocketAddress address = (InetSocketAddress) ch.remoteAddress(); return System.identityHashCode(pools.get(address)); } /** * 获取服务端server列表,每个server对应一个pool */ public void getInetAddresses(String addresses){ addressList = new ArrayList<>(3); // 此处需要注意看是否会出错 if (StringUtil.isNullOrEmpty(addresses)){ throw new RuntimeException("address列表为空"); } String[] splits = addresses.split(","); for (String address : splits){ String[] split = address.split(":"); if (split.length==0){ throw new RuntimeException("["+address+"]不符合IP:PORT格式"); } addressList.add(new InetSocketAddress(split[0],Integer.parseInt(split[1]))); } } } ``` ### 2.2.2 `NettyChannelPoolHandler.java` 注意这里为了解决TCP粘包的问题,使用的是**自定义长度帧解码器**`LengthFieldBasedFrameDecoder`,其用法参考[LengthFieldBasedFrameDecoder 秒懂 - 疯狂创客圈 - 博客园 (cnblogs.com)](https://www.cnblogs.com/crazymakercircle/p/10294745.html),该类解码器需要在主要消息之前添加消息长度,因此可以看见`NettyClientHandler`中的`sendMessage`会先添加消息长度。 关于此处为何要选中使用自定义长度解码器而不是分隔符解码器(`DelimiterBasedFrameDecoder`),因为在使用python构建服务器时(测试用),不知道以何种方式来完成分隔符解码器,故采用自定义长度解码器。 ```java public class NettyChannelPoolHandler implements ChannelPoolHandler { // 分隔符 static final ByteBuf byteBuf = Unpooled.copiedBuffer(DataBusConstant.DELIMITER.getBytes()); private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public void channelReleased(Channel ch) throws Exception { ch.writeAndFlush(Unpooled.EMPTY_BUFFER); logger.info("|-->回收channel.Channel ID:"+ch.id()); } @Override public void channelAcquired(Channel ch) throws Exception { logger.info("|-->获取Channel. Channel ID: " + ch.id()); } @Override public void channelCreated(Channel ch) throws Exception { logger.info("|-->创建Channel. Channel ID: " + ch.id() +"\r\n|-->创建Channel. Channel REAL HASH: " + System.identityHashCode(ch)); SocketChannel channel = (SocketChannel) ch; channel.config().setKeepAlive(true); channel.config().setTcpNoDelay(true); channel.pipeline() // 开启netty自带的心跳处理器,每10秒发一次心跳 .addLast(new IdleStateHandler(0,0,10, TimeUnit.SECONDS)) .addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,4)) .addLast(new StringDecoder()) .addLast(new NettyClientHandler()); } } ``` ## 2.3 辅助类 ### 2.3.1 线程池 ```java public class NettyTaskPool { /** * 线程池线程数量,对应cachedThreadPoolExecutor */ private static final int CORE_POLL_SIZE = 3; private static final int MAX_POLL_SIZE = Integer.MAX_VALUE; private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CORE_POLL_SIZE, MAX_POLL_SIZE, 3, TimeUnit.MINUTES, new LinkedBlockingDeque<>(), new ThreadPoolExecutor.DiscardOldestPolicy() ); public static String submitTask(String message, InetSocketAddress socketAddress) throws Exception{ // 单个任务在线程池内分配单个线程,用于同步等待封装的返回结果 Future submit = threadPool.submit(new ChannelTaskThread(message,socketAddress)); // Future.get() 获取任务的结果,若存在异常,则抛出ExecutionException String response = submit.get(); return response; } } ``` ### 2.3.2 常量类 ```java public class DataBusConstant { public static final String DELIMITER = "%#_#%"; public static final String HEART_BEAT = "{\"HeatBeat\":\"ping-pong-ping-pong\"}"; /** * 最大连接数 */ public static final int MAX_CONNECTIONS = Integer.MAX_VALUE; /** * 核心链接数,该数目内的通道 在没有业务请求时发送心跳防止失活,超过部分的通道close掉 */ public static final int CORE_CONNECTIONS = 0; /** * 同一个线程使用同一个全局唯一的随机数,保证从同一个池中获取和释放资源,同时使用改随机数作为Key获取返回值 */ public static final String RANDOM_KEY = "randomID"; /** * 服务端丢失心跳次数,达到该次数,则关闭通道,默认3次 */ public static final int LOOS_HEART_BEAT_COUNT = 3; public static final String HOST_NOT_REACHABLE = "{\"msg\":\"服务未开启\"}"; } ``` ### 2.3.3 连接常量类 > 可与常量类合并 ```java public class ConnectionUtil { public static final InetSocketAddress USER_AUTH = new InetSocketAddress("127.0.0.1",7000); public static final InetSocketAddress BIO_ATH = new InetSocketAddress("127.0.0.1",8000); public static final InetSocketAddress LIVE_CHECK = new InetSocketAddress("127.0.0.1",9000); } ``` ### 2.3.4 结果类 ```java public class Result { // 前端成功是20000,这里先做一点更改 /** * code:200-请求成功处理;5000-出错 */ private int code; private String message; private Object data; public Result(int code, String message, Object data) { this.code = code; this.message = message; this.data = data; } public Result(){}; public static Result OK(){ return OK(null); } public static Result OK(Object data){ return new Result(200,"操作成功",data); } public static Result ERROR(){ return ERROR("操作失败"); } public static Result ERROR(String message){ return new Result(5000,"操作失败",message); } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } } ``` ### 2.3.5 结果工具类 转换对应消息格式 ```java public class ResultUtil { public static Result getResult(String json){ JsonObject jsonObject = (JsonObject) new JsonParser().parse(json); int code = jsonObject.remove("code").getAsInt(); if(code == 200){ return Result.OK(json); } return Result.ERROR(json); } } ``` ## 2.4 测试 > 接受http请求后,调用上述服务 ```java @RestController public class TestController { // 接受json数据 @PostMapping("/testUser") public Result testUser(@RequestBody Map params) throws Exception { String jsonString = new Gson().toJson(params); String res = NettyTaskPool.submitTask(jsonString, ConnectionUtil.USER_AUTH); return ResultUtil.getResult(res); } @PostMapping("/testBio") public Result testBio(@RequestBody Map params) throws Exception { String jsonString = new Gson().toJson(params); String res = NettyTaskPool.submitTask(jsonString, ConnectionUtil.BIO_ATH); return ResultUtil.getResult(res); } @PostMapping("/testLive") public Result testLive(@RequestBody Map params) throws Exception { String jsonString = new Gson().toJson(params); String res = NettyTaskPool.submitTask(jsonString, ConnectionUtil.LIVE_CHECK); return ResultUtil.getResult(res); } } ``` post请求测试例子: ```json {"username":"admin","password":"admin123","userId":"xiawei","sessionKey":"sessionKey","token":"token","code":200} ``` ## 2.5 服务器 > 服务器使用python编写 这里在将消息长度写入的时候,需要注意要将其转换为大端序,即` size = struct.pack('>i',len(sendData))`,在接收的时候,需要将其转换为小端序,即`lenOfData = struct.unpack('i',len(sendData)) sendData = size + sendData self.request.sendall(sendData) except Exception as e: print(e) break if __name__ == "__main__": s = socketserver.ThreadingTCPServer(ip_port,UserAuthServer) s.serve_forever() ``` # 三、问题 1、channel连接失败无法返回异常 在“雾服务器”运行期间,如果认证服务未开启,此时channel是无法连接成功,但是此时会抛出异常,会重复尝试连接,这样请求的发起者(安卓)会一直处于等待状态,无法返回异常(即服务器未开启)。 # 四、参考连接 [Netty Client实战——高并发连接池方案_itboyer的博客-CSDN博客_netty客户端连接池](https://blog.csdn.net/zd161580/article/details/89678383) [python)解决TCP下的粘包问题_Monicx的博客-CSDN博客_python tcp 粘包](https://blog.csdn.net/miaoqinian/article/details/80020291) [LengthFieldBasedFrameDecoder 秒懂 - 疯狂创客圈 - 博客园 (cnblogs.com)](https://www.cnblogs.com/crazymakercircle/p/10294745.html) 源码地址:https://gitee.com/voidwx/netty-client-connect-pool.git