From 60b3f2cce85daa158b50e20ea02e2761cdfa1644 Mon Sep 17 00:00:00 2001 From: yanming Date: Wed, 25 May 2022 16:52:15 +0800 Subject: [PATCH 1/5] =?UTF-8?q?update:=20COK-6=20=E6=96=B0=E5=A2=9Esmart-s?= =?UTF-8?q?ocket=20excute?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../needcoke/b/component/TestComponent.java | 8 ++--- .../org/needcoke/a/configuration/Config.java | 8 ++--- .../needcoke/a/controller/AController.class | Bin 2031 -> 2040 bytes .../org/needcoke/rpc/codec/CokeRequest.java | 7 ++++ .../org/needcoke/rpc/excutor/Excutor.java | 8 ----- .../rpc/invoker/SmartSocketInvoker.java | 32 ++++++++++++------ .../processor/SmartSocketClientProcessor.java | 14 +++----- .../rpc/server/SmartSocketServer.java | 7 ++++ .../org/needcoke/rpc/utils/ConnectUtil.java | 19 ++++++++++- 9 files changed, 67 insertions(+), 36 deletions(-) delete mode 100644 connect-core/src/main/java/org/needcoke/rpc/excutor/Excutor.java diff --git a/business-b/src/main/java/org/needcoke/b/component/TestComponent.java b/business-b/src/main/java/org/needcoke/b/component/TestComponent.java index f35eac9..f0e3cb4 100644 --- a/business-b/src/main/java/org/needcoke/b/component/TestComponent.java +++ b/business-b/src/main/java/org/needcoke/b/component/TestComponent.java @@ -21,9 +21,9 @@ public class TestComponent { return new WeightedResponseTimeBalance(); } -// @Bean -// public SmartSocketInvoker smartSocketInvoker(){ -// return new SmartSocketInvoker(); -// } + @Bean + public SmartSocketInvoker smartSocketInvoker(){ + return new SmartSocketInvoker(); + } } diff --git a/bussiness-a/src/main/java/org/needcoke/a/configuration/Config.java b/bussiness-a/src/main/java/org/needcoke/a/configuration/Config.java index 5a13126..567cce6 100644 --- a/bussiness-a/src/main/java/org/needcoke/a/configuration/Config.java +++ b/bussiness-a/src/main/java/org/needcoke/a/configuration/Config.java @@ -30,8 +30,8 @@ public class Config { return "say : "+word; } -// @Bean -// public SmartSocketInvoker smartSocketInvoker(){ -// return new SmartSocketInvoker(); -// } + @Bean + public SmartSocketInvoker smartSocketInvoker(){ + return new SmartSocketInvoker(); + } } diff --git a/bussiness-a/target/classes/org/needcoke/a/controller/AController.class b/bussiness-a/target/classes/org/needcoke/a/controller/AController.class index bea2e23cd2949ca7e595357e743e10143687ddc5..5ba52db860fb7864391c1e0255e3cf3275d4c7a8 100644 GIT binary patch delta 164 zcmWlRISRr+001X3u$Zi9ZJ#=Y_Z5$L-$!ecDkzdd#D9o>!$Q(o_y8Z_55(3=OqyX} zU_Qxj(s};w4q>fOTT7Sv0!K1n7$(Vxv1GzjA+>dnuC;T0&srV!8{d=6n1>6=f~A6adYi#j GYsLP8Y#$r| delta 149 zcmeyt|DNCZ)W2Q(7#J8#7y>vMco>Yifh<#Y1~V=OW(IR^1`P%aZU!v|OAukj&R`9a zw&7;bWU%F8;AJppXRu>uu;1uBg^AsPoxzcv!D;h7rWi&RXC4NZ$#yJKx~@D7ZVc`` v3?2-gJPck8-i!H5JXMVWc&JPbYzzB~+m4E~$@ShAP_#&R3v diff --git a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java b/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java index 779ccf9..d74b12a 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java +++ b/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java @@ -28,6 +28,8 @@ public class CokeRequest { @Getter private InvokeResult result; + private Integer requestId; + public CokeRequest setRequestType(ConnectRequestEnum requestType) { this.requestType = requestType; @@ -79,4 +81,9 @@ public class CokeRequest { public byte[] toBytes(){ return JSONObject.toJSONString(this).getBytes(StandardCharsets.UTF_8); } + + public CokeRequest setRequestId(Integer requestId){ + this.requestId = requestId; + return this; + } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/excutor/Excutor.java b/connect-core/src/main/java/org/needcoke/rpc/excutor/Excutor.java deleted file mode 100644 index 5f53d1e..0000000 --- a/connect-core/src/main/java/org/needcoke/rpc/excutor/Excutor.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.needcoke.rpc.excutor; - -/** - * @author Gilgamesh - * @date 2022/4/2 - */ -public interface Excutor { -} diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java index d96d665..5dfe5a9 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java +++ b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java @@ -5,37 +5,47 @@ import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.codec.CokeRequestProtocol; import org.needcoke.rpc.common.constant.ConnectConstant; import org.needcoke.rpc.processor.SmartSocketServerProcessor; +import org.needcoke.rpc.utils.ConnectUtil; import org.smartboot.socket.transport.AioQuickClient; import org.smartboot.socket.transport.AioSession; import org.springframework.cloud.client.ServiceInstance; +import org.springframework.web.context.request.async.DeferredResult; import java.io.IOException; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + @Slf4j -public class SmartSocketInvoker extends ConnectInvoker{ +public class SmartSocketInvoker extends ConnectInvoker { + + /** + * 给AioQuickClient加个引用,防止垃圾回收。 + * //TODO AioSession失效时重建连接 + */ + private final Map clientMap = new ConcurrentHashMap<>(); - private final Map clientMap = new HashMap<>(); + private final Map sessionMap = new ConcurrentHashMap<>(); - private final Map sessionMap = new HashMap<>(); @Override public InvokeResult execute(ServiceInstance instance, String beanName, String methodName, Map params) { - String uri = instance.getHost()+ ConnectConstant.COLON+ instance.getPort(); + String uri = instance.getHost() + ConnectConstant.COLON + instance.getPort(); if (!sessionMap.containsKey(uri)) { - AioQuickClient aioQuickClient = new AioQuickClient(instance.getHost(),instance.getPort(), new CokeRequestProtocol(),new SmartSocketServerProcessor()); - clientMap.put(uri,aioQuickClient); + AioQuickClient aioQuickClient = new AioQuickClient(instance.getHost(), instance.getPort(), new CokeRequestProtocol(), new SmartSocketServerProcessor()); + clientMap.put(uri, aioQuickClient); try { AioSession session = aioQuickClient.start(); - sessionMap.put(uri,session); + sessionMap.put(uri, session); } catch (IOException e) { throw new RuntimeException(e); } } AioSession session = sessionMap.get(uri); + int requestId = ConnectUtil.requestIdMaker.addAndGet(1); CokeRequest request = new CokeRequest().setBeanName(beanName) .setMethodName(methodName) - .setParams(params); + .setParams(params) + .setRequestId(requestId); byte[] bytes = request.toBytes(); try { session.writeBuffer().writeInt(bytes.length); @@ -44,6 +54,8 @@ public class SmartSocketInvoker extends ConnectInvoker{ } catch (IOException e) { throw new RuntimeException(e.getMessage()); } - return InvokeResult.nullResult(); + DeferredResult deferredResult = new DeferredResult(3000L); + ConnectUtil.putRequestMap(deferredResult); + return InvokeResult.nullResult().setBody(deferredResult); } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketClientProcessor.java b/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketClientProcessor.java index 75f20a4..e4331f4 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketClientProcessor.java +++ b/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketClientProcessor.java @@ -2,22 +2,18 @@ package org.needcoke.rpc.processor; import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.common.enums.ConnectRequestEnum; +import org.needcoke.rpc.utils.ConnectUtil; import org.smartboot.socket.transport.AioSession; -import org.springframework.web.context.request.RequestAttributes; -import org.springframework.web.context.request.RequestContextHolder; -import org.springframework.web.context.request.ServletRequestAttributes; -import javax.servlet.http.HttpServletResponse; +import org.springframework.web.context.request.async.DeferredResult; public class SmartSocketClientProcessor extends SmartSocketMessageProcessor { @Override public void process(AioSession aioSession, CokeRequest request) { if (ConnectRequestEnum.INTERNAL_REQUEST == request.getRequestType()) { - RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes(); - HttpServletResponse response = ((ServletRequestAttributes) requestAttributes).getResponse(); - if (null != response){ - this.responseHttp(response,request.getResult()); - } + Integer requestId = request.getRequestId(); + DeferredResult deferredResult = ConnectUtil.getFromRequestMap(requestId); + deferredResult.setResult(request.getResult()); //TODO 抛出异常 } diff --git a/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java b/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java index a0eabb2..9b94e85 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java +++ b/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java @@ -6,6 +6,7 @@ import org.needcoke.rpc.config.ServerConfig; import org.needcoke.rpc.processor.SmartSocketServerProcessor; import org.smartboot.socket.transport.AioQuickServer; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.io.IOException; @@ -22,7 +23,13 @@ public class SmartSocketServer implements ConnectionServer{ @Override public void start() throws IOException { server = new AioQuickServer(serverConfig.getCokeServerPort(), new CokeRequestProtocol(),new SmartSocketServerProcessor()); + server.setBannerEnabled(false); server.start(); log.info("smart socket server start on port {}",serverConfig.getCokeServerPort()); } + + @PreDestroy + public void destroy(){ + server.shutdown(); + } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java b/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java index aa4d97b..758c23c 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java +++ b/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java @@ -9,10 +9,13 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.stereotype.Component; +import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.PostConstruct; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * @author Gilgamesh @@ -55,7 +58,21 @@ public class ConnectUtil { this.ci = ci; } - @Autowired + public static final AtomicInteger requestIdMaker = new AtomicInteger(); + + public static final Map requestMap = new ConcurrentHashMap(); + + public static void putRequestMap(DeferredResult value){ + requestMap.put(requestIdMaker.addAndGet(1),value); + } + + public static void putRequestMap(Integer requestId,DeferredResult value){ + requestMap.put(requestId,value); + } + + public static DeferredResult getFromRequestMap(Integer key){ + return requestMap.get(key); + } /** * 执行远程方法 -- Gitee From f200aff2658deaba61c20247af2abfcbd8b0f573 Mon Sep 17 00:00:00 2001 From: yanming Date: Wed, 25 May 2022 18:21:11 +0800 Subject: [PATCH 2/5] =?UTF-8?q?update:=20COK-6=20=E6=96=B0=E5=A2=9Esmart-s?= =?UTF-8?q?ocket=20excute?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- connect-core/pom.xml | 2 +- .../src/main/java/org/needcoke/rpc/codec/CokeRequest.java | 3 ++- .../java/org/needcoke/rpc/invoker/SmartSocketInvoker.java | 6 +++++- .../{ => smart_socket}/SmartSocketClientProcessor.java | 2 +- .../{ => smart_socket}/SmartSocketMessageProcessor.java | 4 ++-- .../{ => smart_socket}/SmartSocketServerProcessor.java | 2 +- .../java/org/needcoke/rpc/server/SmartSocketServer.java | 2 +- 7 files changed, 13 insertions(+), 8 deletions(-) rename connect-core/src/main/java/org/needcoke/rpc/processor/{ => smart_socket}/SmartSocketClientProcessor.java (94%) rename connect-core/src/main/java/org/needcoke/rpc/processor/{ => smart_socket}/SmartSocketMessageProcessor.java (94%) rename connect-core/src/main/java/org/needcoke/rpc/processor/{ => smart_socket}/SmartSocketServerProcessor.java (98%) diff --git a/connect-core/pom.xml b/connect-core/pom.xml index 47f3956..e080cee 100644 --- a/connect-core/pom.xml +++ b/connect-core/pom.xml @@ -72,7 +72,7 @@ org.smartboot.socket aio-core - 1.5.15 + 1.5.17 diff --git a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java b/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java index d74b12a..0232e07 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java +++ b/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java @@ -79,7 +79,8 @@ public class CokeRequest { } public byte[] toBytes(){ - return JSONObject.toJSONString(this).getBytes(StandardCharsets.UTF_8); + String jsonString = JSONObject.toJSONString(this); + return jsonString.getBytes(); } public CokeRequest setRequestId(Integer requestId){ diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java index 5dfe5a9..3d2b335 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java +++ b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java @@ -4,14 +4,16 @@ import lombok.extern.slf4j.Slf4j; import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.codec.CokeRequestProtocol; import org.needcoke.rpc.common.constant.ConnectConstant; -import org.needcoke.rpc.processor.SmartSocketServerProcessor; +import org.needcoke.rpc.processor.smart_socket.SmartSocketServerProcessor; import org.needcoke.rpc.utils.ConnectUtil; import org.smartboot.socket.transport.AioQuickClient; import org.smartboot.socket.transport.AioSession; +import org.smartboot.socket.transport.WriteBuffer; import org.springframework.cloud.client.ServiceInstance; import org.springframework.web.context.request.async.DeferredResult; import java.io.IOException; +import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -53,6 +55,8 @@ public class SmartSocketInvoker extends ConnectInvoker { session.writeBuffer().flush(); } catch (IOException e) { throw new RuntimeException(e.getMessage()); + }finally { + session.close(); } DeferredResult deferredResult = new DeferredResult(3000L); ConnectUtil.putRequestMap(deferredResult); diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketClientProcessor.java b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java similarity index 94% rename from connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketClientProcessor.java rename to connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java index e4331f4..f6aabd3 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketClientProcessor.java +++ b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java @@ -1,4 +1,4 @@ -package org.needcoke.rpc.processor; +package org.needcoke.rpc.processor.smart_socket; import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.common.enums.ConnectRequestEnum; diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketMessageProcessor.java b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java similarity index 94% rename from connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketMessageProcessor.java rename to connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java index e541575..719400c 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketMessageProcessor.java +++ b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java @@ -1,4 +1,4 @@ -package org.needcoke.rpc.processor; +package org.needcoke.rpc.processor.smart_socket; import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.invoker.InvokeResult; @@ -9,7 +9,7 @@ import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; -public abstract class SmartSocketMessageProcessor implements MessageProcessor { +public abstract class SmartSocketMessageProcessor implements MessageProcessor { public void responseHttp(HttpServletResponse response , InvokeResult result){ try { diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketServerProcessor.java b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java similarity index 98% rename from connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketServerProcessor.java rename to connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java index 7f364fe..47e7e20 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/SmartSocketServerProcessor.java +++ b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java @@ -1,4 +1,4 @@ -package org.needcoke.rpc.processor; +package org.needcoke.rpc.processor.smart_socket; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; diff --git a/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java b/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java index 9b94e85..3cceb74 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java +++ b/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java @@ -3,7 +3,7 @@ package org.needcoke.rpc.server; import lombok.extern.slf4j.Slf4j; import org.needcoke.rpc.codec.CokeRequestProtocol; import org.needcoke.rpc.config.ServerConfig; -import org.needcoke.rpc.processor.SmartSocketServerProcessor; +import org.needcoke.rpc.processor.smart_socket.SmartSocketServerProcessor; import org.smartboot.socket.transport.AioQuickServer; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -- Gitee From 151ed89f9c4ca81861b7127fc0776bf613c49401 Mon Sep 17 00:00:00 2001 From: warren <2410818122@qq.com> Date: Wed, 25 May 2022 22:43:29 +0800 Subject: [PATCH 3/5] =?UTF-8?q?update:=20COK6=20smart-socket=E9=80=82?= =?UTF-8?q?=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../needcoke/b/controller/TestController.java | 18 +++++++++- .../needcoke/a/controller/AController.class | Bin 2040 -> 2031 bytes .../rpc/common/constant/ConnectConstant.java | 2 ++ .../org/needcoke/rpc/config/ServerConfig.java | 12 +++++++ .../rpc/controller/RpcController.java | 32 +++++++++++++++++- .../rpc/invoker/SmartSocketInvoker.java | 9 +++-- .../rpc/server/SmartSocketServer.java | 4 +++ .../org/needcoke/rpc/utils/ConnectUtil.java | 29 ++++++++++++++++ 8 files changed, 102 insertions(+), 4 deletions(-) diff --git a/business-b/src/main/java/org/needcoke/b/controller/TestController.java b/business-b/src/main/java/org/needcoke/b/controller/TestController.java index 59d130e..61d63db 100644 --- a/business-b/src/main/java/org/needcoke/b/controller/TestController.java +++ b/business-b/src/main/java/org/needcoke/b/controller/TestController.java @@ -1,14 +1,19 @@ package org.needcoke.b.controller; +import cn.hutool.core.collection.CollUtil; import lombok.RequiredArgsConstructor; import org.needcoke.rpc.invoker.InvokeResult; +import org.needcoke.rpc.loadBalance.LoadBalance; import org.needcoke.rpc.utils.ConnectUtil; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import javax.servlet.http.HttpServletRequest; +import javax.annotation.Resource; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -27,4 +32,15 @@ public class TestController { InvokeResult execute = ConnectUtil.execute("bussiness-a", "config", "hahha2", map); return execute; } + + @Resource + private DiscoveryClient discoveryClient; + + @Resource + private LoadBalance loadBalance; + @GetMapping("testPort") + public Integer testPort(){ + List instances = discoveryClient.getInstances("bussiness-a"); + return ConnectUtil.getCokeServerPort(loadBalance.choose("bussiness-a",instances)); + } } diff --git a/bussiness-a/target/classes/org/needcoke/a/controller/AController.class b/bussiness-a/target/classes/org/needcoke/a/controller/AController.class index 5ba52db860fb7864391c1e0255e3cf3275d4c7a8..bea2e23cd2949ca7e595357e743e10143687ddc5 100644 GIT binary patch delta 149 zcmeyt|DNCZ)W2Q(7#J8#7y>vMco>Yifh<#Y1~V=OW(IR^1`P%aZU!v|OAukj&R`9a zw&7;bWU%F8;AJppXRu>uu;1uBg^AsPoxzcv!D;h7rWi&RXC4NZ$#yJKx~@D7ZVc`` v3?2-gJPck8-i!H5JXMVWc&JPbYzzB~+m4E~$@ShAP_#&R3v delta 164 zcmWlRISRr+001X3u$Zi9ZJ#=Y_Z5$L-$!ecDkzdd#D9o>!$Q(o_y8Z_55(3=OqyX} zU_Qxj(s};w4q>fOTT7Sv0!K1n7$(Vxv1GzjA+>dnuC;T0&srV!8{d=6n1>6=f~A6adYi#j GYsLP8Y#$r| diff --git a/connect-core/src/main/java/org/needcoke/rpc/common/constant/ConnectConstant.java b/connect-core/src/main/java/org/needcoke/rpc/common/constant/ConnectConstant.java index 6433d50..2af99f4 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/common/constant/ConnectConstant.java +++ b/connect-core/src/main/java/org/needcoke/rpc/common/constant/ConnectConstant.java @@ -13,6 +13,8 @@ public interface ConnectConstant { */ String EXECUTE_RELATIVE_PATH = "/coke/connect/execute"; + String COKE_PORT_RELATIVE_PATH = "/coke/connect/port"; + /** * 实例名称 */ diff --git a/connect-core/src/main/java/org/needcoke/rpc/config/ServerConfig.java b/connect-core/src/main/java/org/needcoke/rpc/config/ServerConfig.java index 40d4fed..4e59bb9 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/config/ServerConfig.java +++ b/connect-core/src/main/java/org/needcoke/rpc/config/ServerConfig.java @@ -13,6 +13,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + @Getter @Configuration public class ServerConfig { @@ -49,4 +52,13 @@ public class ServerConfig { public SmartSocketServer smartSocketServer(){ return new SmartSocketServer(); } + + /** + * server uri -> 端口号 + */ + @ConditionalOnMissingBean(OkHttpsInvoker.class) + @Bean + public Map cokeServerPortMap(){ + return new ConcurrentHashMap<>(); + } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/controller/RpcController.java b/connect-core/src/main/java/org/needcoke/rpc/controller/RpcController.java index cd93938..65fda9d 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/controller/RpcController.java +++ b/connect-core/src/main/java/org/needcoke/rpc/controller/RpcController.java @@ -6,9 +6,14 @@ import lombok.extern.slf4j.Slf4j; import org.needcoke.rpc.common.constant.ConnectConstant; import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; import org.needcoke.rpc.common.exception.CokeConnectException; +import org.needcoke.rpc.config.ServerConfig; +import org.needcoke.rpc.invoker.OkHttpsInvoker; import org.needcoke.rpc.utils.SpringContextUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.web.bind.annotation.*; +import javax.annotation.Resource; import java.lang.reflect.Method; import java.util.Collection; import java.util.Map; @@ -23,11 +28,22 @@ import java.util.Map; @RequiredArgsConstructor public class RpcController { + + private ServerConfig serverConfig; + + @Resource + private ApplicationContext applicationContext; + + @Autowired + public void setServerConfig(ServerConfig serverConfig) { + this.serverConfig = serverConfig; + } + @PostMapping("execute") public Object execute(@RequestParam String beanName, @RequestParam String methodName, @RequestBody Map params) { - log.info("execute http -- beanName : {} , methodName : {} , param : {}",beanName,methodName, JSONObject.toJSONString(params)); + log.info("execute http -- beanName : {} , methodName : {} , param : {}", beanName, methodName, JSONObject.toJSONString(params)); Method method = SpringContextUtils.getMethod(beanName, methodName); if (null == method) { log.error(ConnectionExceptionEnum.BEAN_WITHOUT_METHOD.logStatement(ConnectConstant.EXECUTE_RELATIVE_PATH)); @@ -42,4 +58,18 @@ public class RpcController { throw new CokeConnectException(ConnectionExceptionEnum.INVOKE_METHOD_ERROR); } } + + @GetMapping("port") + public Integer cokeServerPort() { + if (null == serverConfig) { + return 0; + } + try{ + applicationContext.getBean(OkHttpsInvoker.class); + }catch (Exception e){ + return serverConfig.getCokeServerPort(); + + } + return 0; + } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java index 3d2b335..eece6c6 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java +++ b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java @@ -31,8 +31,13 @@ public class SmartSocketInvoker extends ConnectInvoker { @Override public InvokeResult execute(ServiceInstance instance, String beanName, String methodName, Map params) { String uri = instance.getHost() + ConnectConstant.COLON + instance.getPort(); + Integer serverPort = ConnectUtil.getCokeServerPort(instance); + if(0 == serverPort){ + throw new RuntimeException("对方服务未开起server!"); + //TODO 异常统一 + } if (!sessionMap.containsKey(uri)) { - AioQuickClient aioQuickClient = new AioQuickClient(instance.getHost(), instance.getPort(), new CokeRequestProtocol(), new SmartSocketServerProcessor()); + AioQuickClient aioQuickClient = new AioQuickClient(instance.getHost(), serverPort, new CokeRequestProtocol(), new SmartSocketServerProcessor()); clientMap.put(uri, aioQuickClient); try { AioSession session = aioQuickClient.start(); @@ -42,7 +47,7 @@ public class SmartSocketInvoker extends ConnectInvoker { } } - AioSession session = sessionMap.get(uri); + AioSession session = clientMap.get(uri).getSession(); int requestId = ConnectUtil.requestIdMaker.addAndGet(1); CokeRequest request = new CokeRequest().setBeanName(beanName) .setMethodName(methodName) diff --git a/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java b/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java index 3cceb74..319b4ca 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java +++ b/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java @@ -5,10 +5,14 @@ import org.needcoke.rpc.codec.CokeRequestProtocol; import org.needcoke.rpc.config.ServerConfig; import org.needcoke.rpc.processor.smart_socket.SmartSocketServerProcessor; import org.smartboot.socket.transport.AioQuickServer; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.discovery.DiscoveryClient; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.io.IOException; +import java.util.List; @Slf4j public class SmartSocketServer implements ConnectionServer{ diff --git a/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java b/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java index 758c23c..4ebf0a2 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java +++ b/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java @@ -1,6 +1,12 @@ package org.needcoke.rpc.utils; +import com.ejlchina.okhttps.HTTP; +import com.ejlchina.okhttps.HttpResult; +import com.ejlchina.okhttps.SHttpTask; +import com.ejlchina.okhttps.jackson.JacksonMsgConvertor; import lombok.extern.slf4j.Slf4j; +import org.needcoke.rpc.common.constant.ConnectConstant; +import org.needcoke.rpc.common.enums.HttpContentTypeEnum; import org.needcoke.rpc.invoker.ConnectInvoker; import org.needcoke.rpc.invoker.InvokeResult; import org.needcoke.rpc.loadBalance.LoadBalance; @@ -9,9 +15,14 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.stereotype.Component; +import org.springframework.web.context.request.RequestAttributes; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.PostConstruct; +import javax.servlet.http.HttpServletRequest; +import java.util.Enumeration; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -92,6 +103,24 @@ public class ConnectUtil { } + public static Integer getCokeServerPort(ServiceInstance instance){ + RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes(); + HttpServletRequest request = ((ServletRequestAttributes) requestAttributes).getRequest(); + SHttpTask sHttpTask = HTTP.builder().addMsgConvertor(new JacksonMsgConvertor()).build() + .sync(instance.getUri() + ConnectConstant.COKE_PORT_RELATIVE_PATH) + .bodyType(HttpContentTypeEnum.JSON.getValue()); + Enumeration headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String nextElement = headerNames.nextElement(); + String header = request.getHeader(nextElement); + sHttpTask.addHeader(nextElement,header); + } + HttpResult result = sHttpTask + .get(); + return result.getBody().toBean(Integer.class); + } + + -- Gitee From 2454d2b24c6f7c3e09bed1f2a005cf9a8162ecd2 Mon Sep 17 00:00:00 2001 From: warren <2410818122@qq.com> Date: Wed, 25 May 2022 23:26:02 +0800 Subject: [PATCH 4/5] =?UTF-8?q?update:=20COK6=20smart-socket=E9=80=82?= =?UTF-8?q?=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rpc/invoker/SmartSocketInvoker.java | 20 +++++++++++-------- .../loadBalance/RoundRobinLoadBalance.java | 4 ---- .../WeightedResponseTimeBalance.java | 6 ------ .../org/needcoke/rpc/utils/ConnectUtil.java | 6 +----- 4 files changed, 13 insertions(+), 23 deletions(-) diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java index eece6c6..071c022 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java +++ b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java @@ -39,15 +39,19 @@ public class SmartSocketInvoker extends ConnectInvoker { if (!sessionMap.containsKey(uri)) { AioQuickClient aioQuickClient = new AioQuickClient(instance.getHost(), serverPort, new CokeRequestProtocol(), new SmartSocketServerProcessor()); clientMap.put(uri, aioQuickClient); - try { - AioSession session = aioQuickClient.start(); - sessionMap.put(uri, session); - } catch (IOException e) { - throw new RuntimeException(e); - } - +// try { +// AioSession session = aioQuickClient.start(); +// sessionMap.put(uri, session); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } + } + AioSession session = null; + try { + session = clientMap.get(uri).start(); + } catch (IOException e) { + throw new RuntimeException(e); } - AioSession session = clientMap.get(uri).getSession(); int requestId = ConnectUtil.requestIdMaker.addAndGet(1); CokeRequest request = new CokeRequest().setBeanName(beanName) .setMethodName(methodName) diff --git a/connect-core/src/main/java/org/needcoke/rpc/loadBalance/RoundRobinLoadBalance.java b/connect-core/src/main/java/org/needcoke/rpc/loadBalance/RoundRobinLoadBalance.java index eaa9948..e190431 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/loadBalance/RoundRobinLoadBalance.java +++ b/connect-core/src/main/java/org/needcoke/rpc/loadBalance/RoundRobinLoadBalance.java @@ -1,10 +1,6 @@ package org.needcoke.rpc.loadBalance; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.client.ServiceInstance; -import org.springframework.stereotype.Component; - import java.util.List; import java.util.concurrent.ConcurrentHashMap; public class RoundRobinLoadBalance extends LoadBalance { diff --git a/connect-core/src/main/java/org/needcoke/rpc/loadBalance/WeightedResponseTimeBalance.java b/connect-core/src/main/java/org/needcoke/rpc/loadBalance/WeightedResponseTimeBalance.java index 8af4660..108478d 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/loadBalance/WeightedResponseTimeBalance.java +++ b/connect-core/src/main/java/org/needcoke/rpc/loadBalance/WeightedResponseTimeBalance.java @@ -2,14 +2,8 @@ package org.needcoke.rpc.loadBalance; import cn.hutool.core.bean.BeanUtil; import org.needcoke.rpc.CokeServiceInstance; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.client.ServiceInstance; -import org.springframework.stereotype.Component; - -import java.net.URI; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; public class WeightedResponseTimeBalance extends LoadBalance { diff --git a/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java b/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java index 4ebf0a2..124abe4 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java +++ b/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java @@ -56,6 +56,7 @@ public class ConnectUtil { discoveryClient = dc; loadBalance = lb; connectInvoker = ci; + log.info("ConnectUtil 加载的 loadBalance是 {} , 加载的ConnectInvoker是 {}。",loadBalance.getClass().getSimpleName(),connectInvoker.getClass().getSimpleName()); } private LoadBalance lb; @@ -119,9 +120,4 @@ public class ConnectUtil { .get(); return result.getBody().toBean(Integer.class); } - - - - - } -- Gitee From 446af5e7b2e2dea49c1fa429a63bb0c775aa29c8 Mon Sep 17 00:00:00 2001 From: yanming Date: Thu, 26 May 2022 13:59:07 +0800 Subject: [PATCH 5/5] =?UTF-8?q?update:=20COK-6=20=E6=96=B0=E5=A2=9Esmart-s?= =?UTF-8?q?ocket=20invoker=E9=80=82=E9=85=8D80%,=E5=B7=B2=E7=BB=8F?= =?UTF-8?q?=E5=8F=AF=E7=94=A8,=E4=BD=86=E6=9C=89=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application.yml | 6 ++- .../rpc/codec/CokeRequestProtocol.java | 10 ++++- .../needcoke/rpc/invoker/InvokeResult.java | 2 + .../rpc/invoker/SmartSocketInvoker.java | 45 +++++++++---------- .../SmartSocketClientProcessor.java | 14 ++++-- .../SmartSocketMessageProcessor.java | 2 +- .../SmartSocketServerProcessor.java | 6 +-- .../org/needcoke/rpc/utils/ConnectUtil.java | 14 +++--- 8 files changed, 60 insertions(+), 39 deletions(-) diff --git a/bussiness-a/src/main/resources/application.yml b/bussiness-a/src/main/resources/application.yml index 5fe70a0..43c6995 100644 --- a/bussiness-a/src/main/resources/application.yml +++ b/bussiness-a/src/main/resources/application.yml @@ -13,4 +13,8 @@ spring: group: 相亲相爱一家人 access-key: server: - port: 8000 \ No newline at end of file + port: 8000 + +coke: + server: + port: 13005 \ No newline at end of file diff --git a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequestProtocol.java b/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequestProtocol.java index e9715db..a8d0935 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequestProtocol.java +++ b/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequestProtocol.java @@ -23,6 +23,14 @@ public class CokeRequestProtocol implements Protocol { byte[] b = new byte[length]; readBuffer.get(b); readBuffer.mark(); - return JSONObject.parseObject(new String(b),CokeRequest.class); + String json = new String(b); + CokeRequest request = null; + try { + request = JSONObject.parseObject(json, CokeRequest.class); + }catch (Exception e){ + System.out.println("error { "+json+" }"); + throw new RuntimeException(e); + } + return request; } } \ No newline at end of file diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/InvokeResult.java b/connect-core/src/main/java/org/needcoke/rpc/invoker/InvokeResult.java index a166016..3c81c58 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/InvokeResult.java +++ b/connect-core/src/main/java/org/needcoke/rpc/invoker/InvokeResult.java @@ -3,6 +3,7 @@ package org.needcoke.rpc.invoker; import com.alibaba.fastjson.JSONObject; import com.ejlchina.okhttps.HttpResult; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; import java.io.Serializable; @@ -14,6 +15,7 @@ import java.nio.charset.StandardCharsets; @Data @Accessors(chain = true) +@EqualsAndHashCode public class InvokeResult implements Serializable { /** diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java index 071c022..f4a4adf 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java +++ b/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java @@ -1,21 +1,20 @@ package org.needcoke.rpc.invoker; +import cn.hutool.core.date.DateUtil; import lombok.extern.slf4j.Slf4j; import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.codec.CokeRequestProtocol; import org.needcoke.rpc.common.constant.ConnectConstant; -import org.needcoke.rpc.processor.smart_socket.SmartSocketServerProcessor; +import org.needcoke.rpc.processor.smart_socket.SmartSocketClientProcessor; import org.needcoke.rpc.utils.ConnectUtil; import org.smartboot.socket.transport.AioQuickClient; import org.smartboot.socket.transport.AioSession; -import org.smartboot.socket.transport.WriteBuffer; import org.springframework.cloud.client.ServiceInstance; -import org.springframework.web.context.request.async.DeferredResult; import java.io.IOException; -import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.LockSupport; @Slf4j public class SmartSocketInvoker extends ConnectInvoker { @@ -32,26 +31,21 @@ public class SmartSocketInvoker extends ConnectInvoker { public InvokeResult execute(ServiceInstance instance, String beanName, String methodName, Map params) { String uri = instance.getHost() + ConnectConstant.COLON + instance.getPort(); Integer serverPort = ConnectUtil.getCokeServerPort(instance); - if(0 == serverPort){ + if (0 == serverPort) { throw new RuntimeException("对方服务未开起server!"); //TODO 异常统一 } if (!sessionMap.containsKey(uri)) { - AioQuickClient aioQuickClient = new AioQuickClient(instance.getHost(), serverPort, new CokeRequestProtocol(), new SmartSocketServerProcessor()); + AioQuickClient aioQuickClient = new AioQuickClient(instance.getHost(), serverPort, new CokeRequestProtocol(), new SmartSocketClientProcessor()); clientMap.put(uri, aioQuickClient); -// try { -// AioSession session = aioQuickClient.start(); -// sessionMap.put(uri, session); -// } catch (IOException e) { -// throw new RuntimeException(e); -// } - } - AioSession session = null; - try { - session = clientMap.get(uri).start(); - } catch (IOException e) { - throw new RuntimeException(e); + try { + AioSession session = aioQuickClient.start(); + sessionMap.put(uri, session); + } catch (IOException e) { + throw new RuntimeException(e); + } } + AioSession session = sessionMap.get(uri); int requestId = ConnectUtil.requestIdMaker.addAndGet(1); CokeRequest request = new CokeRequest().setBeanName(beanName) .setMethodName(methodName) @@ -64,11 +58,16 @@ public class SmartSocketInvoker extends ConnectInvoker { session.writeBuffer().flush(); } catch (IOException e) { throw new RuntimeException(e.getMessage()); - }finally { - session.close(); } - DeferredResult deferredResult = new DeferredResult(3000L); - ConnectUtil.putRequestMap(deferredResult); - return InvokeResult.nullResult().setBody(deferredResult); + InvokeResult tmp = new InvokeResult(); + long start = DateUtil.current(); + ConnectUtil.putRequestMap(requestId, tmp); + ConnectUtil.threadMap.put(requestId, Thread.currentThread()); + LockSupport.park(); + InvokeResult result = ConnectUtil.getFromRequestMap(requestId); + long end = DateUtil.current(); + log.info("requestId = {} , start = {} , end = {} ,cost = {}", requestId, start, end, end - start); + result.setTime(end - start); + return result; } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java index f6aabd3..e166d20 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java +++ b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java @@ -1,19 +1,25 @@ package org.needcoke.rpc.processor.smart_socket; +import lombok.extern.slf4j.Slf4j; import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.common.enums.ConnectRequestEnum; import org.needcoke.rpc.utils.ConnectUtil; import org.smartboot.socket.transport.AioSession; -import org.springframework.web.context.request.async.DeferredResult; +import java.util.concurrent.locks.LockSupport; + +@Slf4j public class SmartSocketClientProcessor extends SmartSocketMessageProcessor { @Override public void process(AioSession aioSession, CokeRequest request) { - if (ConnectRequestEnum.INTERNAL_REQUEST == request.getRequestType()) { + if (ConnectRequestEnum.INTERNAL_RESPONSE == request.getRequestType()) { Integer requestId = request.getRequestId(); - DeferredResult deferredResult = ConnectUtil.getFromRequestMap(requestId); - deferredResult.setResult(request.getResult()); + log.info("smart socket client receive back requestId = {} , request json = {}", + requestId,new String(request.toBytes())); + ConnectUtil.putRequestMap(requestId,request.getResult()); + Thread thread = ConnectUtil.threadMap.get(requestId); + LockSupport.unpark(thread); //TODO 抛出异常 } diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java index 719400c..448aaac 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java +++ b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java @@ -26,7 +26,7 @@ public abstract class SmartSocketMessageProcessor implements MessageProcessor public void response(AioSession session, CokeRequest response){ WriteBuffer outputStream = session.writeBuffer(); try { - byte[] bytes = response.toString().getBytes(); + byte[] bytes = response.toBytes(); outputStream.writeInt(bytes.length); outputStream.write(bytes); outputStream.flush(); diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java index 47e7e20..3a32a0a 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java +++ b/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java @@ -26,10 +26,10 @@ public class SmartSocketServerProcessor extends SmartSocketMessageProcessor params = request.getParams(); - log.info("execute smart socket -- beanName : {} , methodName : {} , param : {}",beanName,methodName, JSONObject.toJSONString(params)); + log.info("execute smart socket requestId = {} , -- beanName : {} , methodName : {} , param : {}", request.getRequestId(), beanName, methodName, JSONObject.toJSONString(params)); Method method = SpringContextUtils.getMethod(beanName, methodName); if (null == method) { - log.error(ConnectionExceptionEnum.BEAN_WITHOUT_METHOD.logStatement(ConnectConstant.EXECUTE_RELATIVE_PATH)); + log.error(ConnectionExceptionEnum.BEAN_WITHOUT_METHOD.logStatement("beanName {} , methodName {}"), beanName, methodName); throw new CokeConnectException(ConnectionExceptionEnum.BEAN_WITHOUT_METHOD); } Object bean = SpringContextUtils.getBean(beanName); @@ -37,7 +37,7 @@ public class SmartSocketServerProcessor extends SmartSocketMessageProcessor requestMap = new ConcurrentHashMap(); + public static final Map requestMap = new ConcurrentHashMap(); - public static void putRequestMap(DeferredResult value){ - requestMap.put(requestIdMaker.addAndGet(1),value); + public static void putRequestMap(InvokeResult result){ + requestMap.put(requestIdMaker.addAndGet(1),result); } - public static void putRequestMap(Integer requestId,DeferredResult value){ - requestMap.put(requestId,value); + public static ConcurrentHashMap threadMap = new ConcurrentHashMap<>(); + + public static void putRequestMap(Integer requestId,InvokeResult result){ + requestMap.put(requestId,result); } - public static DeferredResult getFromRequestMap(Integer key){ + public static InvokeResult getFromRequestMap(Integer key){ return requestMap.get(key); } -- Gitee