diff --git a/README.md b/README.md index f16f8e8b39a0afbd72c7ad611876b7bb970aaa33..923d9d3ddb25f741d944c3ee29685428a9b42c92 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,28 @@ # coke-connect #### 介绍 -基于spring boot/spring cloud生态的跨服务rpc调用组件。支持eureka和nacos两种注册中心,支持负载均衡配置(默认轮询)。灵活的函数名称调用方式。 +基于spring cloud生态的跨服务rpc调用组件。支持eureka和nacos两种注册中心,支持负载均衡配置(默认轮询)。 + +灵活的函数名称调用方式。 #### 特点 -1、对linux平台的epoll优化 -2、灵活的消息拦截机制 +支持选配基于okHttps,smartSocket,netty作为跨服务调用的消息组件 + +支持多种负载均衡策略 + +调用远程方法就像调用本地方法一样 + +#### 展望 + +@RpcClient注解的支持 +接口快速失败 ,失败重试 +接口调用失败日志采集 ,链路追踪 ,实时分析修改注册中心中该节点的健康状态 -#### 参与贡献 +通过注册中心直接获取Rpc端口号(现在是从注册中心获取){ + step 1: 将rpc注解以metadata的方式存入注册中心,后续调用不需要调接口 + step 2: 适配spring-boot-protocol,采用代理的方式统一端口 + step 3: 覆盖主流tomcat 、undertow 、 jetty,增设拦截器,共享一个服务器 +} -1. Fork 本仓库 -2. 新建 Feat_xxx 分支 -3. 提交代码 -4. 新建 Pull Request diff --git a/business-b/src/main/java/org/needcoke/b/BApplication.java b/business-b/src/main/java/org/needcoke/b/BApplication.java deleted file mode 100644 index 6da84333f20033805a0d244d2bbf9478ba70dbb7..0000000000000000000000000000000000000000 --- a/business-b/src/main/java/org/needcoke/b/BApplication.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.needcoke.b; - -import org.needcoke.rpc.utils.SpringContextUtils; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.client.discovery.EnableDiscoveryClient; -import org.springframework.context.ConfigurableApplicationContext; - -/** - * @author Gilgamesh - * @date 2022/4/2 - */ -@EnableDiscoveryClient -@SpringBootApplication -public class BApplication { - - public static void main(String[] args) { - ConfigurableApplicationContext run = SpringApplication.run(BApplication.class, args); - SpringContextUtils bean = run.getBean(SpringContextUtils.class); - - Object beanNameMethodMap = run.getBean("beanNameMethodMap"); - - System.out.println(123); - - } -} diff --git a/business-b/src/main/java/org/needcoke/b/component/TestComponent.java b/business-b/src/main/java/org/needcoke/b/component/TestComponent.java deleted file mode 100644 index 8ed46cf3f9f4455298197edc0bbdce4fd8901b51..0000000000000000000000000000000000000000 --- a/business-b/src/main/java/org/needcoke/b/component/TestComponent.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.needcoke.b.component; - -import org.needcoke.rpc.invoker.SmartSocketInvoker; -import org.needcoke.rpc.loadBalance.RoundRobinLoadBalance; -import org.needcoke.rpc.loadBalance.WeightedResponseTimeBalance; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; -import org.springframework.stereotype.Component; - -/** - * @author Gilgamesh - * @date 2022/4/2 - */ -@Component -public class TestComponent { - - -// @Bean -// @Primary -// public WeightedResponseTimeBalance weightedResponseTimeBalance(){ -// return new WeightedResponseTimeBalance(); -// } - - @Bean - public SmartSocketInvoker smartSocketInvoker(){ - return new SmartSocketInvoker(); - } - -} diff --git a/business-b/src/main/java/org/needcoke/b/controller/TestController.java b/business-b/src/main/java/org/needcoke/b/controller/TestController.java deleted file mode 100644 index 7bde14bc7f916cabb405036f343cdc6431bd625a..0000000000000000000000000000000000000000 --- a/business-b/src/main/java/org/needcoke/b/controller/TestController.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.needcoke.b.controller; - -import cn.hutool.core.collection.CollUtil; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.needcoke.rpc.annotation.Rpc; -import org.needcoke.rpc.invoker.InvokeResult; -import org.needcoke.rpc.loadBalance.LoadBalance; -import org.needcoke.rpc.utils.ConnectUtil; -import org.springframework.cloud.client.ServiceInstance; -import org.springframework.cloud.client.discovery.DiscoveryClient; -import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.context.request.RequestAttributes; -import org.springframework.web.context.request.RequestContextHolder; - -import javax.annotation.Resource; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author Gilgamesh - * @date 2022/4/2 - */ -@RestController -@RequestMapping("api/b") -@RequiredArgsConstructor -public class TestController { - - @GetMapping("test") - public InvokeResult test(){ - Map map = new HashMap<>(); - map.put("word","刘勇是死废物"); - InvokeResult execute = ConnectUtil.execute("bussiness-a", "config", "hahha2", map); - return execute; - } - - @Resource - private DiscoveryClient discoveryClient; - - @Resource - private LoadBalance loadBalance; - @GetMapping("testPort") - public Integer testPort(){ - List instances = discoveryClient.getInstances("bussiness-a"); - return ConnectUtil.getCokeServerPort(loadBalance.choose("bussiness-a",instances)); - } -} diff --git a/business-b/src/main/resources/application.yml b/business-b/src/main/resources/application.yml deleted file mode 100644 index a06f02a68c4acab45c3ed95a68045255b429fabf..0000000000000000000000000000000000000000 --- a/business-b/src/main/resources/application.yml +++ /dev/null @@ -1,19 +0,0 @@ - -spring: - application: - name: bussiness-b - # nacos默认可以不写 但是 如果不是默认的必须要写 - cloud: - nacos: - #注册中心 - discovery: - #server-addr: http://192.168.*:8848 - server-addr: http://127.0.0.1:8848 - cluster-name: 严鸣是吕诗文爸爸 - group: 相亲相爱一家人 -server: - port: 8081 - -coke: - server: - port: 13001 \ No newline at end of file diff --git a/business-c/src/main/java/org/needcoke/c/CApplication.java b/business-c/src/main/java/org/needcoke/c/CApplication.java deleted file mode 100644 index 632f06f09d7a4397ec94b7d5097178f093a13830..0000000000000000000000000000000000000000 --- a/business-c/src/main/java/org/needcoke/c/CApplication.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.needcoke.c; - -import cn.hutool.core.util.ReUtil; - -/** - * @author Gilgamesh - * @date 2022/4/2 - */ -public class CApplication { - - public static void main(String[] args) { - - /* by warren: 横向模糊匹配 */ - // String regex = "ab{2,3}c"; String content = "abbbbc";‘ - - /* by warren: 纵向匹配 */ - String regex = "a[123]b";String content = "a1b"; - //String regex = ""; String content = ""; - boolean match = ReUtil.isMatch(regex, content); - System.out.println(match); - } -} diff --git a/bussiness-a/src/main/java/org/needcoke/a/AApplication.java b/bussiness-a/src/main/java/org/needcoke/a/AApplication.java deleted file mode 100644 index 2c814e1710081b169e57fc256360681c94e1ca4d..0000000000000000000000000000000000000000 --- a/bussiness-a/src/main/java/org/needcoke/a/AApplication.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.needcoke.a; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.client.discovery.EnableDiscoveryClient; - -/** - * @author Gilgamesh - * @date 2022/4/2 - */ -@EnableDiscoveryClient -@SpringBootApplication -public class AApplication { - - public static void main(String[] args) { - SpringApplication.run(AApplication.class, args); - } -} diff --git a/bussiness-a/src/main/java/org/needcoke/a/configuration/Config.java b/bussiness-a/src/main/java/org/needcoke/a/configuration/Config.java deleted file mode 100644 index 567cce6ab73753b519b3c7d78f5aeb85870a68a9..0000000000000000000000000000000000000000 --- a/bussiness-a/src/main/java/org/needcoke/a/configuration/Config.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.needcoke.a.configuration; - -import lombok.extern.slf4j.Slf4j; -import org.needcoke.rpc.annotation.Call; -import org.needcoke.rpc.annotation.Rpc; -import org.needcoke.rpc.invoker.SmartSocketInvoker; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -/** - * - * @author yanming - * @date 2022/5/11 - */ -@Component -@Slf4j -@Rpc -public class Config { - - - public String haha(){ - log.info(this.getClass().getName()+":haha()被调用"); - return "haha"; - } - - - @Call("hahha2") - public String haha(String word){ - log.info(this.getClass().getName()+":say()被调用"); - return "say : "+word; - } - - @Bean - public SmartSocketInvoker smartSocketInvoker(){ - return new SmartSocketInvoker(); - } -} diff --git a/bussiness-a/src/main/java/org/needcoke/a/controller/AController.java b/bussiness-a/src/main/java/org/needcoke/a/controller/AController.java deleted file mode 100644 index 54b37258c4eaca8618c140e014bec57e780197a4..0000000000000000000000000000000000000000 --- a/bussiness-a/src/main/java/org/needcoke/a/controller/AController.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.needcoke.a.controller; - -import lombok.RequiredArgsConstructor; -import org.springframework.cloud.client.ServiceInstance; -import org.springframework.cloud.client.discovery.DiscoveryClient; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author yanming - * @date 2022/5/11 - */ -@RestController -@RequestMapping("api/a") -@RequiredArgsConstructor -public class AController { - - private final DiscoveryClient discoveryClient; - - @GetMapping("instance") - public List instance() { - return discoveryClient.getServices(); - } - - @GetMapping("instanceInfo") - public Map> instanceInfo() { - List services = discoveryClient.getServices(); - Map> ret = new HashMap<>(); - for (String service : services) { - ret.put(service, discoveryClient.getInstances(service)); - } - return ret; - } -} diff --git a/bussiness-a/src/main/resources/application.yml b/bussiness-a/src/main/resources/application.yml deleted file mode 100644 index 43c6995ad20c63380096a80fd4b25d1c28cc0e9b..0000000000000000000000000000000000000000 --- a/bussiness-a/src/main/resources/application.yml +++ /dev/null @@ -1,20 +0,0 @@ - -spring: - application: - name: bussiness-a - # nacos默认可以不写 但是 如果不是默认的必须要写 - cloud: - nacos: - #注册中心 - discovery: - #server-addr: http://192.168.*:8848 - server-addr: http://127.0.0.1:8848 - cluster-name: 严鸣是吕诗文爸爸 - group: 相亲相爱一家人 - access-key: -server: - port: 8000 - -coke: - server: - port: 13005 \ No newline at end of file diff --git a/bussiness-a/target/classes/application.yml b/bussiness-a/target/classes/application.yml deleted file mode 100644 index 5fe70a0add403f81d09cd8083e2bcc8084ec38bc..0000000000000000000000000000000000000000 --- a/bussiness-a/target/classes/application.yml +++ /dev/null @@ -1,16 +0,0 @@ - -spring: - application: - name: bussiness-a - # nacos默认可以不写 但是 如果不是默认的必须要写 - cloud: - nacos: - #注册中心 - discovery: - #server-addr: http://192.168.*:8848 - server-addr: http://127.0.0.1:8848 - cluster-name: 严鸣是吕诗文爸爸 - group: 相亲相爱一家人 - access-key: -server: - port: 8000 \ No newline at end of file diff --git a/bussiness-a/target/classes/org/needcoke/a/AApplication.class b/bussiness-a/target/classes/org/needcoke/a/AApplication.class deleted file mode 100644 index 5c149a68adf51c185c444e7d91044bc679d8857c..0000000000000000000000000000000000000000 Binary files a/bussiness-a/target/classes/org/needcoke/a/AApplication.class and /dev/null differ diff --git a/bussiness-a/target/classes/org/needcoke/a/controller/AController.class b/bussiness-a/target/classes/org/needcoke/a/controller/AController.class deleted file mode 100644 index bea2e23cd2949ca7e595357e743e10143687ddc5..0000000000000000000000000000000000000000 Binary files a/bussiness-a/target/classes/org/needcoke/a/controller/AController.class and /dev/null differ diff --git a/connect-core/pom.xml b/connect-core/pom.xml index e080cee759604dcc3ee0aa09c8828eb34431c74b..4d77d5f03466ff225cb94f5f5004c046112a40b6 100644 --- a/connect-core/pom.xml +++ b/connect-core/pom.xml @@ -22,43 +22,22 @@ spring-boot-starter-web 2.3.2.RELEASE - - com.alibaba.cloud - spring-cloud-starter-alibaba-nacos-discovery - 2.2.5.RELEASE - + org.projectlombok lombok ${lombok-version} - - org.apache.httpcomponents - httpclient - 4.5.3 - org.slf4j slf4j-api 1.7.16 - - - org.smartboot.http - smart-http-client - 1.1.12 - com.ejlchina okhttps-jackson 3.5.1 - - - org.springframework.cloud - spring-cloud-starter-netflix-ribbon - 2.2.5.RELEASE - com.alibaba fastjson @@ -69,11 +48,53 @@ hutool-all ${hutool-version} + org.smartboot.socket aio-core 1.5.17 + + org.apache.httpcomponents + httpclient + 4.5.3 + + + net.dreamlu + mica-auto + 2.3.1 + provided + + + + org.springframework.cloud + spring-cloud-commons + 2.2.5.RELEASE + compile + + + org.projectlombok + lombok + + + + net.dreamlu + mica-auto + 2.3.1 + provided + + + + org.needcoke + connect-link-tracking + abandon + + + + org.needcoke + connect-fuse + abandon + @@ -87,17 +108,6 @@ pom import - - org.springframework.cloud - spring-cloud-dependencies - 2.2.5.RELEASE - - - - com.alibaba.cloud - spring-cloud-alibaba-dependencies - 2.2.5.RELEASE - diff --git a/connect-core/src/main/java/org/needcoke/rpc/annotation/EnableRpcClient.java b/connect-core/src/main/java/org/needcoke/rpc/annotation/EnableRpcClient.java new file mode 100644 index 0000000000000000000000000000000000000000..cece67c25a3e291ac1816eaf1ee295537093a105 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/annotation/EnableRpcClient.java @@ -0,0 +1,32 @@ +package org.needcoke.rpc.annotation; + +import org.needcoke.rpc.config.RpcClientRegister; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@Documented +@Import({RpcClientRegister.class}) +public @interface EnableRpcClient { + + + /** + * Alias for the {@link #basePackages()} attribute. Allows for more concise annotation + * declarations e.g.: {@code @ComponentScan("org.my.pkg")} instead of {@code @ComponentScan(basePackages="org.my.pkg")}. + * + * @return the array of 'basePackages'. + */ + String[] value() default {}; + + /** + * Base packages to scan for annotated components. + *

+ * {@link #value()} is an alias for (and mutually exclusive with) this attribute. + *

+ * + * @return the array of 'basePackages'. + */ + String[] basePackages() default {}; +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/annotation/Rpc.java b/connect-core/src/main/java/org/needcoke/rpc/annotation/Rpc.java index 90173cd2353c23c92af853511441f519e3218710..4dc6aa3b0c8e52297be6e2d6b18fd15d68ea4076 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/annotation/Rpc.java +++ b/connect-core/src/main/java/org/needcoke/rpc/annotation/Rpc.java @@ -11,17 +11,10 @@ import java.lang.annotation.*; * @author Gilgamesh * @date 2022/4/2 */ -@Target({ElementType.TYPE, ElementType.METHOD}) +@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented -@Import(Component.class) public @interface Rpc { - String value() default ""; - - String serviceId() default ""; - String beanName() default ""; - - String callName() default ""; } diff --git a/connect-core/src/main/java/org/needcoke/rpc/annotation/RpcClient.java b/connect-core/src/main/java/org/needcoke/rpc/annotation/RpcClient.java new file mode 100644 index 0000000000000000000000000000000000000000..df13bb73b5484f37141da44bc64650b217cbe4e7 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/annotation/RpcClient.java @@ -0,0 +1,20 @@ +package org.needcoke.rpc.annotation; + +import java.lang.annotation.*; + +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface RpcClient { + + String name() default ""; + + String value() default ""; + + String beanName(); + + String serviceId(); + + + +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java b/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java index 0232e07fc64192dbd3a7021fea9e81a5e03fbf11..3780848af2f0d055b6715263c5931c064ac63d59 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java +++ b/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequest.java @@ -5,7 +5,6 @@ import lombok.Getter; import org.needcoke.rpc.common.enums.ConnectRequestEnum; import org.needcoke.rpc.invoker.InvokeResult; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -23,12 +22,17 @@ public class CokeRequest { private Map params; - private Map headers; + private Map headers; + + private String cokeRequestId; @Getter private InvokeResult result; - private Integer requestId; + public CokeRequest setCokeRequestId(String cokeRequestId) { + this.cokeRequestId = cokeRequestId; + return this; + } public CokeRequest setRequestType(ConnectRequestEnum requestType) { @@ -51,11 +55,11 @@ public class CokeRequest { return this; } - public CokeRequest addParam(String name,Object param) { + public CokeRequest addParam(String name, Object param) { if (null == params) { params = new HashMap<>(); } - params.put(name,param); + params.put(name, param); return this; } @@ -65,11 +69,15 @@ public class CokeRequest { return this; } - public CokeRequest addHeader(String name,String header) { + public String getHeader(String name) { + return this.headers.get(name); + } + + public CokeRequest addHeader(String name, String header) { if (null == headers) { headers = new HashMap<>(); } - headers.put(name,header); + headers.put(name, header); return this; } @@ -78,13 +86,9 @@ public class CokeRequest { return this; } - public byte[] toBytes(){ + public byte[] toBytes() { String jsonString = JSONObject.toJSONString(this); return jsonString.getBytes(); } - public CokeRequest setRequestId(Integer requestId){ - this.requestId = requestId; - return this; - } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/common/constant/ConnectConstant.java b/connect-core/src/main/java/org/needcoke/rpc/common/constant/ConnectConstant.java index 2af99f447746816e22052328d906a1accf2368d2..95591cea861237b77ce471b7e4e4290cee386bfa 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/common/constant/ConnectConstant.java +++ b/connect-core/src/main/java/org/needcoke/rpc/common/constant/ConnectConstant.java @@ -15,6 +15,8 @@ public interface ConnectConstant { String COKE_PORT_RELATIVE_PATH = "/coke/connect/port"; + String COKE_RPC_TYPE_RELATIVE_PATH = "/coke/connect/rpcType"; + /** * 实例名称 */ @@ -34,4 +36,6 @@ public interface ConnectConstant { * 冒号 : */ String COLON = ":"; + + String COKE_REQUEST_ID_HEADER_ID_NAME = "COKE-LINK-TRACKING-INFO"; } diff --git a/connect-core/src/main/java/org/needcoke/rpc/common/enums/ConnectionExceptionEnum.java b/connect-core/src/main/java/org/needcoke/rpc/common/enums/ConnectionExceptionEnum.java index ab7ad2588fbe6f3b6c5b3e588a2ef78db0809198..e14ec7ff26a741d93e8fed6540631043f0dbbd5c 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/common/enums/ConnectionExceptionEnum.java +++ b/connect-core/src/main/java/org/needcoke/rpc/common/enums/ConnectionExceptionEnum.java @@ -6,11 +6,11 @@ package org.needcoke.rpc.common.enums; * @author yanming * @date 2022/5/12 */ -public enum ConnectionExceptionEnum { +public enum ConnectionExceptionEnum implements EnumInterface{ NO_SUCH_BEAN_NAME("0001", "no such bean name", "没有找到对应的beanName的bean"), BEAN_WITHOUT_METHOD("0002", - "the bean name ${beanName} without method name ${method} ,you can use annotation @call or check method public.", + "the bean name {} without method name {} ,you can use annotation @call or check method public.", "您请求的bean没有该方法,您可以考虑添加@call注解,或者修改method的访问权限为public。"), INVOKE_METHOD_ERROR("0003","invoke remote method error,please check the method name or param list ,if @Call ,you need to use @Call.value" @@ -18,6 +18,15 @@ public enum ConnectionExceptionEnum { CAN_NOT_FIND_SUCH_INSTANCE("0004","can't find this instance.","找不到对应的实例"), + REMOTE_SERVICE_DOES_NOT_OPEN_THE_COKE_SERVICE_PORT("0005","remote service does not open the coke service port.","远程服务未开启coke服务端口"), + + RECONNECTION_WITH_REMOTE_SERVICE_FAILED("0006","reconnection with remote service failed","与远程服务重建连接失败"), + + CONNECTION_WITH_REMOTE_SERVICE_FAILED("0007","reconnection with remote service failed","与远程服务重建连接失败"), + + THE_FORMAT_OF_THE_REMOTE_SERVICE_PORT_NUMBER_IS_INCORRECT_PLEASE_CHECK_THE_CONFIGURATION_OF_THE_REMOTE_SERVICE_PORT_NUMBER + ("0008","The format of the remote service port number is incorrect. Please check the configuration of the remote service port number", + "远程服务端口号格式错误,请检查远程服务端口号配置") ; private final String code; diff --git a/connect-core/src/main/java/org/needcoke/rpc/common/enums/EnumInterface.java b/connect-core/src/main/java/org/needcoke/rpc/common/enums/EnumInterface.java new file mode 100644 index 0000000000000000000000000000000000000000..715e4c3e9757ca68f346b7f5f280635bd05fce2b --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/common/enums/EnumInterface.java @@ -0,0 +1,10 @@ +package org.needcoke.rpc.common.enums; + +public interface EnumInterface { + + String getErrorCode(); + String getNote() ; + + public String getValue(); + +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/common/enums/LoadExceptionEnum.java b/connect-core/src/main/java/org/needcoke/rpc/common/enums/LoadExceptionEnum.java new file mode 100644 index 0000000000000000000000000000000000000000..67645110169dda9e1ffcba227a53d8440e2870aa --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/common/enums/LoadExceptionEnum.java @@ -0,0 +1,30 @@ +package org.needcoke.rpc.common.enums; + +public enum LoadExceptionEnum implements EnumInterface{ + RPC_CLIENT_SCAN_ANNOTATION_IS_NOT_ADDED_TO_THE_STARTUP_CLASS("001","Rpcclient scan annotation is not added to the startup class","没有在启动类加上RpcClient扫描注解"), + ; + + private final String code; + + private final String value; + + private final String note; + + LoadExceptionEnum(String code, String value, String note) { + this.code = code; + this.value = value; + this.note = note; + } + + public String getErrorCode() { + return "1002" + code; + } + + public String getNote() { + return note; + } + + public String getValue() { + return value; + } +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/common/enums/RpcTypeEnum.java b/connect-core/src/main/java/org/needcoke/rpc/common/enums/RpcTypeEnum.java new file mode 100644 index 0000000000000000000000000000000000000000..09a21ca1efecf3fed72c532568f5a113c50958c3 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/common/enums/RpcTypeEnum.java @@ -0,0 +1,6 @@ +package org.needcoke.rpc.common.enums; + +public enum RpcTypeEnum { + + okHttp3,smartSocket,netty +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/common/exception/CokeConnectException.java b/connect-core/src/main/java/org/needcoke/rpc/common/exception/CokeConnectException.java index 7a4e66acc37e3628c421d3387bd068b0b741ddfe..3140d708ed7518be1dc9d9b9ec3cd0f5341b6af3 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/common/exception/CokeConnectException.java +++ b/connect-core/src/main/java/org/needcoke/rpc/common/exception/CokeConnectException.java @@ -1,7 +1,11 @@ package org.needcoke.rpc.common.exception; import lombok.Data; +import org.connect.rpc.link.tracking.util.TrackingUtil; import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; +import org.needcoke.rpc.common.enums.EnumInterface; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; /** * coke通用异常 @@ -10,12 +14,15 @@ import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; * @date 2022/5/12 */ @Data +@ResponseStatus(code = HttpStatus.BAD_GATEWAY) public class CokeConnectException extends RuntimeException { private String errorCode; private String note; + private String requestId; + public CokeConnectException(String message, Throwable cause, String errorCode) { super(message, cause); this.errorCode = errorCode; @@ -26,15 +33,22 @@ public class CokeConnectException extends RuntimeException { this.errorCode = errorCode; } - public CokeConnectException(ConnectionExceptionEnum connectionExceptionEnum) { + public CokeConnectException(EnumInterface connectionExceptionEnum) { super(connectionExceptionEnum.getValue()); this.errorCode = connectionExceptionEnum.getErrorCode(); this.note = connectionExceptionEnum.getNote(); } - public CokeConnectException(ConnectionExceptionEnum connectionExceptionEnum,Throwable e) { + public CokeConnectException(EnumInterface connectionExceptionEnum,Throwable e) { + super(connectionExceptionEnum.getValue(),e); + this.errorCode = connectionExceptionEnum.getErrorCode(); + this.note = connectionExceptionEnum.getNote(); + } + + public CokeConnectException(String requestId ,ConnectionExceptionEnum connectionExceptionEnum,Throwable e) { super(connectionExceptionEnum.getValue(),e); this.errorCode = connectionExceptionEnum.getErrorCode(); this.note = connectionExceptionEnum.getNote(); + this.requestId = requestId; } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/common/exception/CokeSmartSocketException.java b/connect-core/src/main/java/org/needcoke/rpc/common/exception/CokeSmartSocketException.java index 613e13b8a851e6901c9ec0cf0ac378d32af90c36..b68c7bae81511d1c730e55a6aa96aed333804555 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/common/exception/CokeSmartSocketException.java +++ b/connect-core/src/main/java/org/needcoke/rpc/common/exception/CokeSmartSocketException.java @@ -2,8 +2,11 @@ package org.needcoke.rpc.common.exception; import lombok.Data; import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; @Data +@ResponseStatus(code = HttpStatus.BAD_GATEWAY) public class CokeSmartSocketException extends RuntimeException { private String errorCode; diff --git a/connect-core/src/main/java/org/needcoke/rpc/config/CokeConfiguration.java b/connect-core/src/main/java/org/needcoke/rpc/config/CokeConfiguration.java index 06388b0bd6efa99cb0e7b796d313ecfee4f4ef74..3e88d96c28b324de7d957ac24289536bdf95db17 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/config/CokeConfiguration.java +++ b/connect-core/src/main/java/org/needcoke/rpc/config/CokeConfiguration.java @@ -2,7 +2,6 @@ package org.needcoke.rpc.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; - import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; @@ -11,7 +10,7 @@ import java.util.Map; * @author Gilgamesh * @date 2022/4/2 */ -@Configuration +@Configuration(proxyBeanMethods = false) public class CokeConfiguration { /** @@ -29,4 +28,5 @@ public class CokeConfiguration { public Map classNameMethodMap(){ return new HashMap<>(); } + } diff --git a/connect-core/src/main/java/org/needcoke/rpc/config/CokeConnectContextInitializer.java b/connect-core/src/main/java/org/needcoke/rpc/config/CokeConnectContextInitializer.java deleted file mode 100644 index da2bac660d81593eaed579b53f1cb0f2167fc813..0000000000000000000000000000000000000000 --- a/connect-core/src/main/java/org/needcoke/rpc/config/CokeConnectContextInitializer.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.needcoke.rpc.config; - -import org.springframework.context.annotation.ComponentScan; - -/** - * Spring Boot 配置文件的扫描类 EnableAutoConfiguration - * - * @author Gilgamesh - * @date 2022/4/2 - */ - -@ComponentScan("org.needcoke.rpc") -public class CokeConnectContextInitializer { - - -} diff --git a/connect-core/src/main/java/org/needcoke/rpc/config/CokeHandlerInterceptorAdapter.java b/connect-core/src/main/java/org/needcoke/rpc/config/CokeHandlerInterceptorAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..f2c32f0ab72c9b90c34aa346c86ee67feca5c574 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/config/CokeHandlerInterceptorAdapter.java @@ -0,0 +1,35 @@ +package org.needcoke.rpc.config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.catalina.connector.RequestFacade; +import org.connect.rpc.link.tracking.config.LinkTrackingContextHolder; +import org.connect.rpc.link.tracking.net.LinkTracking; +import org.connect.rpc.link.tracking.util.TrackingUtil; +import org.needcoke.rpc.utils.SpringContextUtils; +import org.springframework.web.servlet.HandlerInterceptor; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.util.Map; + +@Slf4j +public class CokeHandlerInterceptorAdapter implements HandlerInterceptor { + + @Override + public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { + ServerConfig bean = SpringContextUtils.getBean(ServerConfig.class); + int port = bean.getMvcPort(); + TrackingUtil.preHttp(request,response,handler,port); + LinkTracking linkTracking = LinkTrackingContextHolder.getLinkTracking(); + if(request instanceof RequestFacade){ + String requestURI = request.getRequestURI(); + linkTracking.addMataData("http path",requestURI); + } + return true; + } + + @Override + public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { + HandlerInterceptor.super.afterCompletion(request, response, handler, ex); + } +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/config/CokeWebMvcConfigurer.java b/connect-core/src/main/java/org/needcoke/rpc/config/CokeWebMvcConfigurer.java new file mode 100644 index 0000000000000000000000000000000000000000..ed792e72fe96ce6b546e0074fcc8bd3be9fb65c9 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/config/CokeWebMvcConfigurer.java @@ -0,0 +1,14 @@ +package org.needcoke.rpc.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.InterceptorRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@Configuration +public class CokeWebMvcConfigurer implements WebMvcConfigurer { + + @Override + public void addInterceptors(InterceptorRegistry registry) { + registry.addInterceptor(new CokeHandlerInterceptorAdapter()).addPathPatterns("/**"); + } +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/config/ConnectExceptionAdvice.java b/connect-core/src/main/java/org/needcoke/rpc/config/ConnectExceptionAdvice.java index 06c55d056bba50a0521dc6f302f034759a042238..d1104d2ec705ab4f01747a85a2b3923d5f837144 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/config/ConnectExceptionAdvice.java +++ b/connect-core/src/main/java/org/needcoke/rpc/config/ConnectExceptionAdvice.java @@ -1,5 +1,6 @@ package org.needcoke.rpc.config; +import org.connect.rpc.link.tracking.config.LinkTrackingContextHolder; import org.needcoke.rpc.common.exception.CokeConnectException; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.RestControllerAdvice; @@ -16,12 +17,24 @@ import java.util.Map; @RestControllerAdvice public class ConnectExceptionAdvice { - @ExceptionHandler(value = CokeConnectException.class) - public Map exceptionHandle(CokeConnectException exception) { + + public Map connectExceptionHandle(CokeConnectException exception) { Map ret = new HashMap<>(); ret.put("message", exception.getMessage()); ret.put("note", exception.getNote()); ret.put("errorCode", exception.getErrorCode()); return ret; } + + @ExceptionHandler(value = Exception.class) + public Map exceptionHandle(Exception exception) { + if(exception instanceof CokeConnectException){ + return connectExceptionHandle((CokeConnectException)exception); + } + Map ret = new HashMap<>(); + ret.put("message", exception.getMessage()); + ret.put("note", exception.getLocalizedMessage()); + ret.put("errorCode", "502"); + return ret; + } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/config/RpcClientRegister.java b/connect-core/src/main/java/org/needcoke/rpc/config/RpcClientRegister.java new file mode 100644 index 0000000000000000000000000000000000000000..50b752f7ea5ca43ef94cfda382ec2b8079aff16b --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/config/RpcClientRegister.java @@ -0,0 +1,263 @@ +package org.needcoke.rpc.config; + +import cn.hutool.core.util.StrUtil; +import org.needcoke.rpc.annotation.EnableRpcClient; +import org.needcoke.rpc.annotation.RpcClient; +import org.needcoke.rpc.proxy.RpcClientFactoryBean; +import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.config.BeanDefinitionHolder; +import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; +import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.context.EnvironmentAware; +import org.springframework.context.ResourceLoaderAware; +import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; +import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; +import org.springframework.core.env.Environment; +import org.springframework.core.io.ResourceLoader; +import org.springframework.core.type.AnnotationMetadata; +import org.springframework.core.type.classreading.MetadataReader; +import org.springframework.core.type.classreading.MetadataReaderFactory; +import org.springframework.core.type.filter.AnnotationTypeFilter; +import org.springframework.core.type.filter.TypeFilter; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.StringUtils; + +import java.io.IOException; +import java.util.*; + +/** + * simple rpc 客户端扫描注册器 + * + * @author Mr_wenpan@163.com 2022/1/19 2:11 下午 + */ +public class RpcClientRegister implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware { + + /** + * 资源加载器 + */ + private ResourceLoader resourceLoader; + /** + * 环境 + */ + private Environment environment; + + public RpcClientRegister() { + } + + @Override + public void setResourceLoader(ResourceLoader resourceLoader) { + this.resourceLoader = resourceLoader; + } + + @Override + public void setEnvironment(Environment environment) { + this.environment = environment; + } + + @Override + public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { + // 注册bean客户端 + registerSimpleRpcClients(importingClassMetadata, registry); + } + + /** + * 注册simple rpc 客户端 + */ + public void registerSimpleRpcClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { + ClassPathScanningCandidateComponentProvider scanner = getScanner(); + scanner.setResourceLoader(resourceLoader); + + AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(RpcClient.class); + // 这里只会走if,暂时不会走else去 + scanner.addIncludeFilter(annotationTypeFilter); + Set basePackages = getBasePackages(metadata); + + // 遍历每一个basePackages + for (String basePackage : basePackages) { + // 通过scanner获取候选组件 + Set candidateComponents = scanner.findCandidateComponents(basePackage); + for (BeanDefinition candidateComponent : candidateComponents) { + if (candidateComponent instanceof AnnotatedBeanDefinition) { + // verify annotated class is an interface + AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent; + AnnotationMetadata annotationMetadata = beanDefinition.getMetadata(); + Assert.isTrue(annotationMetadata.isInterface(), "@SimpleRpcClient can only be specified on an interface"); + // 获取SimpleRpcClient注解的属性 + Map attributes = annotationMetadata.getAnnotationAttributes(RpcClient.class.getCanonicalName()); + registerSimpleRpcClient(registry, annotationMetadata, attributes,beanDefinition); + } + } + } + } + + public Class getClass(String beanClassName){ + try { + return Class.forName(beanClassName); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + /** + * 注册simple rpc 客户端 + * + * @param registry bean定义信息注册器 + * @param annotationMetadata 元数据 + * @param attributes @SimpleRpcClient注解的属性 + * @author Mr_wenpan@163.com 2022/1/19 2:48 下午 + */ + private void registerSimpleRpcClient(BeanDefinitionRegistry registry, + AnnotationMetadata annotationMetadata, + Map attributes, + AnnotatedBeanDefinition bd) { + // 类名(接口全限定名) + String className = annotationMetadata.getClassName(); + BeanDefinitionBuilder definitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(RpcClientFactoryBean.class); + // 解析出@SimpleRpcClient注解的name + String name = getName(attributes); + if (!StringUtils.hasText(name)) { + throw new RuntimeException(String.format("class [%s] , @SimpleRpcClient name or value can not be null, please check.", className)); + } + definitionBuilder.addPropertyValue("name", name); + definitionBuilder.addPropertyValue("type",getClass(className)); + definitionBuilder.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); + String alias = name + "SimpleRpcClient"; + AbstractBeanDefinition beanDefinition = definitionBuilder.getBeanDefinition(); + + beanDefinition.setPrimary(true); + + // 设置qualifier,优先使用qualifier + String qualifier = getQualifier(attributes); + if (StringUtils.hasText(qualifier)) { + alias = qualifier; + } + + // 注册bean定义信息 + BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[]{alias}); + BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); + } + + /** + * 获取qualifier + */ + private static String getQualifier(Map client) { + if (client == null) { + return null; + } + String qualifier = (String) client.get("qualifier"); + if (StringUtils.hasText(qualifier)) { + return qualifier; + } + return null; + } + + /** + * 获取name + */ + protected String getName(Map attributes) { + String name = (String) attributes.get("name"); + if (!StringUtils.hasText(name)) { + name = (String) attributes.get("value"); + } + if (StrUtil.isEmpty(name)) { + name = resolve(name); + } + ; + return name; + } + + /** + * 解析name + */ + private String resolve(String value) { + if (StringUtils.hasText(value)) { + return environment.resolvePlaceholders(value); + } + return value; + } + + /** + * 获取扫描器 + */ + protected ClassPathScanningCandidateComponentProvider getScanner() { + return new ClassPathScanningCandidateComponentProvider(false, environment) { + @Override + protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) { + boolean isCandidate = false; + if (beanDefinition.getMetadata().isIndependent()) { + if (!beanDefinition.getMetadata().isAnnotation()) { + isCandidate = true; + } + } + return isCandidate; + } + }; + } + + /** + * 获取base packages + */ + protected static Set getBasePackages(AnnotationMetadata importingClassMetadata) { + // 获取到@EnableSimpleRpcClients注解所有属性 + Map attributes = importingClassMetadata + .getAnnotationAttributes(EnableRpcClient.class.getCanonicalName()); + if (null == attributes) { + return null; + } + Set basePackages = new HashSet<>(); + assert attributes != null; + String[] values = (String[]) attributes.get("value"); + for (String pkg : values) { + if (StringUtils.hasText(pkg)) { + basePackages.add(pkg); + } + } + String[] bgs = (String[]) attributes.get("basePackages"); + for (String pkg : bgs) { + if (StringUtils.hasText(pkg)) { + basePackages.add(pkg); + } + } + // 如果上面两步都没有获取到basePackages,那么这里就默认使用当前项目启动类所在的包为basePackages + if (basePackages.isEmpty()) { + basePackages.add(ClassUtils.getPackageName(importingClassMetadata.getClassName())); + } + return basePackages; + } + + /** + * Helper class to create a {@link TypeFilter} that matches if all the delegates match. + * + * @author Oliver Gierke + */ + private static class AllTypeFilter implements TypeFilter { + + private final List delegates; + + /** + * Creates a new {@link AllTypeFilter} to match if all the given delegates match. + * + * @param delegates must not be {@literal null}. + */ + public AllTypeFilter(List delegates) { + Assert.notNull(delegates, "This argument is required, it must not be null"); + this.delegates = delegates; + } + + @Override + public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException { + + for (TypeFilter filter : delegates) { + if (!filter.match(metadataReader, metadataReaderFactory)) { + return false; + } + } + + return true; + } + } +} \ No newline at end of file diff --git a/connect-core/src/main/java/org/needcoke/rpc/config/ServerConfig.java b/connect-core/src/main/java/org/needcoke/rpc/config/ServerConfig.java index 4e59bb9e092fcc7b670b226135fe517a2ed6575c..5dcbbc1687c6c6d9718943e525b958ef2dc0d99e 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/config/ServerConfig.java +++ b/connect-core/src/main/java/org/needcoke/rpc/config/ServerConfig.java @@ -3,21 +3,15 @@ package org.needcoke.rpc.config; import lombok.Getter; import org.needcoke.rpc.invoker.ConnectInvoker; import org.needcoke.rpc.invoker.OkHttpsInvoker; -import org.needcoke.rpc.invoker.SmartSocketInvoker; import org.needcoke.rpc.loadBalance.LoadBalance; import org.needcoke.rpc.loadBalance.RoundRobinLoadBalance; -import org.needcoke.rpc.server.SmartSocketServer; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - @Getter -@Configuration +@Configuration(proxyBeanMethods = false) public class ServerConfig { @Value("${coke.server.port:12001}") @@ -26,39 +20,24 @@ public class ServerConfig { @Value("${coke.server.type:http}") private String serverType; + @Value("${server.port}") + private int mvcPort; + /** * coke-connect的默认远程调用组件为okHttps */ @ConditionalOnMissingBean(ConnectInvoker.class) @Bean - public OkHttpsInvoker okHttpsInvoker(){ + public OkHttpsInvoker okHttpsInvoker() { return new OkHttpsInvoker(); } - /** - * coke-connect的默认负载均衡策略为轮询 - */ - @ConditionalOnMissingBean(LoadBalance.class) - @Bean - public RoundRobinLoadBalance roundRobinLoadBalance(){ - return new RoundRobinLoadBalance(); - } - /** - * 当远程调用方式修改为SmartSocketInvoker时启动SmartSocketServer + * coke-connect的默认负载均衡策略为轮询 */ - @ConditionalOnBean(SmartSocketInvoker.class) - @Bean - public SmartSocketServer smartSocketServer(){ - return new SmartSocketServer(); - } - - /** - * server uri -> 端口号 - */ - @ConditionalOnMissingBean(OkHttpsInvoker.class) + @ConditionalOnMissingBean(LoadBalance.class) @Bean - public Map cokeServerPortMap(){ - return new ConcurrentHashMap<>(); + public RoundRobinLoadBalance roundRobinLoadBalance() { + return new RoundRobinLoadBalance(); } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/controller/RpcController.java b/connect-core/src/main/java/org/needcoke/rpc/controller/RpcController.java index 65fda9da2acecab1dfafbf32cdf07c9a48f4d02a..b066603551671114a6d2d77f7e12ca44bc309d61 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/controller/RpcController.java +++ b/connect-core/src/main/java/org/needcoke/rpc/controller/RpcController.java @@ -3,11 +3,11 @@ package org.needcoke.rpc.controller; import com.alibaba.fastjson.JSONObject; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.connect.rpc.link.tracking.util.TrackingUtil; import org.needcoke.rpc.common.constant.ConnectConstant; import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; import org.needcoke.rpc.common.exception.CokeConnectException; import org.needcoke.rpc.config.ServerConfig; -import org.needcoke.rpc.invoker.OkHttpsInvoker; import org.needcoke.rpc.utils.SpringContextUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -28,7 +28,6 @@ import java.util.Map; @RequiredArgsConstructor public class RpcController { - private ServerConfig serverConfig; @Resource @@ -43,7 +42,7 @@ public class RpcController { public Object execute(@RequestParam String beanName, @RequestParam String methodName, @RequestBody Map params) { - log.info("execute http -- beanName : {} , methodName : {} , param : {}", beanName, methodName, JSONObject.toJSONString(params)); + log.info("execute http -- beanName : {} , methodName : {} , param : {} ,linkTracking = {}", beanName, methodName, JSONObject.toJSONString(params), TrackingUtil.linkTrackingJsonStr()); Method method = SpringContextUtils.getMethod(beanName, methodName); if (null == method) { log.error(ConnectionExceptionEnum.BEAN_WITHOUT_METHOD.logStatement(ConnectConstant.EXECUTE_RELATIVE_PATH)); @@ -58,18 +57,4 @@ public class RpcController { throw new CokeConnectException(ConnectionExceptionEnum.INVOKE_METHOD_ERROR); } } - - @GetMapping("port") - public Integer cokeServerPort() { - if (null == serverConfig) { - return 0; - } - try{ - applicationContext.getBean(OkHttpsInvoker.class); - }catch (Exception e){ - return serverConfig.getCokeServerPort(); - - } - return 0; - } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/ConnectInvoker.java b/connect-core/src/main/java/org/needcoke/rpc/invoker/ConnectInvoker.java index 568b7aedf88e7946e57c5ddb797acd75afa96390..9d6b4cfc2362c4e5eca04343733778174a587d48 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/ConnectInvoker.java +++ b/connect-core/src/main/java/org/needcoke/rpc/invoker/ConnectInvoker.java @@ -3,14 +3,26 @@ package org.needcoke.rpc.invoker; import com.alibaba.fastjson.JSONObject; import com.ejlchina.okhttps.HttpResult; import lombok.extern.slf4j.Slf4j; +import org.needcoke.rpc.FuseThreadPool; +import org.needcoke.rpc.common.enums.RpcTypeEnum; +import org.needcoke.rpc.config.FuseConfig; +import org.needcoke.rpc.net.Connector; import org.springframework.cloud.client.ServiceInstance; - +import org.springframework.context.annotation.Lazy; +import javax.annotation.Resource; import java.util.Map; @Slf4j public abstract class ConnectInvoker { - public abstract InvokeResult execute(ServiceInstance instance, String beanName, String methodName, Map params); + @Resource + protected FuseConfig fuseConfig; + + @Resource + @Lazy + protected FuseThreadPool fuseThreadPool; + + public abstract InvokeResult execute(Connector connector, ServiceInstance instance, String beanName, String methodName, Map params); /** @@ -26,4 +38,14 @@ public abstract class ConnectInvoker { " , params = " + JSONObject.toJSONString(params); log.debug(builder); } + protected InvokeResult runDefaultExecute(Connector connector, ServiceInstance instance, String beanName, String methodName, Map params){ + RpcTypeEnum remoteRpcType = connector.getRpcType(instance); + if(remoteRpcType == RpcTypeEnum.okHttp3){ + if (null == connector.getHttpInvoker()) { + connector.setHttpInvoker(new OkHttpsInvoker()); + } + return connector.compensationExecute(instance,beanName,methodName,params); + } + return null; + } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/InvokeResult.java b/connect-core/src/main/java/org/needcoke/rpc/invoker/InvokeResult.java index 3c81c5837c78a95aa253178254a86715020fbb2b..9ef02fb02698052f537ec240f6367baba5ff4c4a 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/InvokeResult.java +++ b/connect-core/src/main/java/org/needcoke/rpc/invoker/InvokeResult.java @@ -7,7 +7,6 @@ import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; import java.io.Serializable; -import java.nio.charset.StandardCharsets; /** * 调用器的调用结果 diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/OkHttpsInvoker.java b/connect-core/src/main/java/org/needcoke/rpc/invoker/OkHttpsInvoker.java index ac0753fc6ffda408edec282f868816d08f70d1d7..00609562002220c3b8d8e81d6866ef79cde1a22c 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/OkHttpsInvoker.java +++ b/connect-core/src/main/java/org/needcoke/rpc/invoker/OkHttpsInvoker.java @@ -4,8 +4,10 @@ import com.ejlchina.okhttps.HTTP; import com.ejlchina.okhttps.HttpResult; import com.ejlchina.okhttps.SHttpTask; import com.ejlchina.okhttps.jackson.JacksonMsgConvertor; +import org.connect.rpc.link.tracking.util.TrackingUtil; import org.needcoke.rpc.common.constant.ConnectConstant; import org.needcoke.rpc.common.enums.HttpContentTypeEnum; +import org.needcoke.rpc.net.Connector; import org.springframework.cloud.client.ServiceInstance; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; @@ -20,26 +22,22 @@ import java.util.Map; */ public class OkHttpsInvoker extends ConnectInvoker { - public OkHttpsInvoker() { - System.out.println("哈哈哈"); - } - @Override - public InvokeResult execute(ServiceInstance instance, String beanName, String methodName, Map params) { - + public InvokeResult execute(Connector connector, ServiceInstance instance, String beanName, String methodName, Map params) { RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes(); HttpServletRequest request = ((ServletRequestAttributes) requestAttributes).getRequest(); SHttpTask sHttpTask = HTTP.builder().addMsgConvertor(new JacksonMsgConvertor()).build() .sync(instance.getUri() + ConnectConstant.EXECUTE_RELATIVE_PATH) .bodyType(HttpContentTypeEnum.JSON.getValue()) .addBodyPara(params) + .addHeader(TrackingUtil.headerKey(), TrackingUtil.headerValue()) .addUrlPara(ConnectConstant.BEAN_NAME, beanName) .addUrlPara(ConnectConstant.METHOD_NAME, methodName); Enumeration headerNames = request.getHeaderNames(); while (headerNames.hasMoreElements()) { String nextElement = headerNames.nextElement(); String header = request.getHeader(nextElement); - sHttpTask.addHeader(nextElement,header); + sHttpTask.addHeader(nextElement, header); } HttpResult result = sHttpTask .post(); diff --git a/connect-core/src/main/java/org/needcoke/rpc/loadBalance/LoadBalance.java b/connect-core/src/main/java/org/needcoke/rpc/loadBalance/LoadBalance.java index 6d29ab37a4498a2a456cd8090b0f3ce2533edeac..7660a16295cf497e66831bae891a1614d2dbc1ae 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/loadBalance/LoadBalance.java +++ b/connect-core/src/main/java/org/needcoke/rpc/loadBalance/LoadBalance.java @@ -15,8 +15,6 @@ public abstract class LoadBalance { /** * 从实例列表选择一个远程服务 - *

- * TODO 扩展负载均衡策略 * * @param instances 从注册中心获取的实例列表 * @return 某一个特定的实例 diff --git a/connect-core/src/main/java/org/needcoke/rpc/loadBalance/WeightedResponseTimeBalance.java b/connect-core/src/main/java/org/needcoke/rpc/loadBalance/WeightedResponseTimeBalance.java index 98d9ae437d612ffa54eee27cace6a9efa67555f1..f419d96285bf855b979c95bf98ede47bb5c92c62 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/loadBalance/WeightedResponseTimeBalance.java +++ b/connect-core/src/main/java/org/needcoke/rpc/loadBalance/WeightedResponseTimeBalance.java @@ -2,7 +2,11 @@ package org.needcoke.rpc.loadBalance; import cn.hutool.core.bean.BeanUtil; import org.needcoke.rpc.CokeServiceInstance; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.client.ServiceInstance; +import org.springframework.stereotype.Component; + +import java.net.URI; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; diff --git a/connect-core/src/main/java/org/needcoke/rpc/net/Connector.java b/connect-core/src/main/java/org/needcoke/rpc/net/Connector.java new file mode 100644 index 0000000000000000000000000000000000000000..4e951a65e83f91bacbeb4ee470ff0acfac430bd2 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/net/Connector.java @@ -0,0 +1,116 @@ +package org.needcoke.rpc.net; + +import cn.hutool.core.collection.CollUtil; +import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; +import org.needcoke.rpc.common.enums.RpcTypeEnum; +import org.needcoke.rpc.common.exception.CokeConnectException; +import org.needcoke.rpc.invoker.ConnectInvoker; +import org.needcoke.rpc.invoker.InvokeResult; +import org.needcoke.rpc.loadBalance.LoadBalance; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.discovery.DiscoveryClient; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Connector { + + private final String serviceId; + + private final DiscoveryClient discoveryClient; + + private final ConnectInvoker invoker; + + private final LoadBalance loadBalance; + + private ConnectInvoker httpInvoker; + + private final Map requestTypeMap; + + public RpcTypeEnum getRpcType(ServiceInstance instance){ + return requestTypeMap.get(instance); + } + + private final Map severPortMap; + + public Integer getServerPort(ServiceInstance instance){ + return severPortMap.get(instance); + } + + public Connector(String serviceId, DiscoveryClient discoveryClient, ConnectInvoker invoker, LoadBalance loadBalance) { + this.serviceId = serviceId; + this.discoveryClient = discoveryClient; + this.invoker = invoker; + this.loadBalance = loadBalance; + this.requestTypeMap = new HashMap<>(); + this.severPortMap = new HashMap<>(); + } + + /** + * 执行远程方法 + * + * @param beanName 远程服务上的 bean的名称 + * @param methodName 方法名称 或 @Call value + * @return 返回远程方法执行结果的json + */ + public InvokeResult execute(String beanName, + String methodName, + Map params) { + List instances = discoveryClient.getInstances(serviceId); + ServiceInstance instance = loadBalance.choose(serviceId, instances); + if (!requestTypeMap.containsKey(instance)) { + Map metadata = instance.getMetadata(); + if (CollUtil.isNotEmpty(metadata)) { + String rpcType = metadata.get("rpcType"); + String cokeServerPort = metadata.get("coke-server-port"); + RpcTypeEnum rp = RpcTypeEnum.okHttp3; + if (null != rpcType) { + + switch (rpcType) { + case "netty": + rp = RpcTypeEnum.netty; + break; + case "smart socket": + case "smartSocket": + + case "smart-socket": + rp = RpcTypeEnum.smartSocket; + break; + + case "okHttp3": + rp = RpcTypeEnum.okHttp3; + break; + + default: + rp = RpcTypeEnum.okHttp3; + } + } + this.requestTypeMap.put(instance,rp); + try { + int port = Integer.parseInt(cokeServerPort); + this.severPortMap.put(instance,port); + }catch (Exception e){ + throw new CokeConnectException(ConnectionExceptionEnum.THE_FORMAT_OF_THE_REMOTE_SERVICE_PORT_NUMBER_IS_INCORRECT_PLEASE_CHECK_THE_CONFIGURATION_OF_THE_REMOTE_SERVICE_PORT_NUMBER); + } + } + } + InvokeResult result = invoker.execute(this, instance, beanName, methodName, params); + return result; + } + + public ConnectInvoker getHttpInvoker() { + return httpInvoker; + } + + public void setHttpInvoker(ConnectInvoker httpInvoker) { + this.httpInvoker = httpInvoker; + } + + public InvokeResult compensationExecute(ServiceInstance instance, + String beanName, + String methodName, + Map params) { + return this.httpInvoker.execute(this, instance, beanName, methodName, params); + } +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/net/ConnectorFactory.java b/connect-core/src/main/java/org/needcoke/rpc/net/ConnectorFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..fb7031f7b804fae6355ec0762f1816ad26644452 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/net/ConnectorFactory.java @@ -0,0 +1,50 @@ +package org.needcoke.rpc.net; + +import org.needcoke.rpc.invoker.ConnectInvoker; +import org.needcoke.rpc.loadBalance.LoadBalance; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class ConnectorFactory { + + private DiscoveryClient dc; + + private ConnectInvoker ci; + + private LoadBalance lb; + + @Autowired + public void setLb(LoadBalance lb) { + this.lb = lb; + } + @Autowired + @Lazy + public void setCi(ConnectInvoker ci) { + this.ci = ci; + } + + @Autowired + public void setDc(DiscoveryClient dc) { + this.dc = dc; + } + + private final ConcurrentHashMap connectorMap = new ConcurrentHashMap<>(); + + /** + * 创建或者获取连接器 + */ + public Connector connector(String serviceId){ + if (connectorMap.containsKey(serviceId)) { + return connectorMap.get(serviceId); + } + Connector connector = new Connector(serviceId, dc, ci, lb); + connectorMap.put(serviceId,connector); + return connector; + } + +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/proxy/ClientProxyCreateFactory.java b/connect-core/src/main/java/org/needcoke/rpc/proxy/ClientProxyCreateFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..1ffda77abe861c9923227b24bc75128cc2062153 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/proxy/ClientProxyCreateFactory.java @@ -0,0 +1,37 @@ +package org.needcoke.rpc.proxy; + +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; +import org.needcoke.rpc.annotation.RpcClient; +import org.needcoke.rpc.invoker.InvokeResult; +import org.needcoke.rpc.net.ConnectorFactory; +import org.needcoke.rpc.utils.SpringContextUtils; + +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.HashMap; +import java.util.Map; + +/** + * 客户端代理创建工厂 + */ +@Slf4j +public class ClientProxyCreateFactory { + + /** + * 通过接口的class创建该接口的代理对象(这里直接基于JDK提供的创建动态代理的工具来创建代理对象) + * + * @param serviceClass 接口的class + * @return T 代理对象 + */ + public static T getProxyService(Class serviceClass) { + // 该接口的Class对象是被那个类加载器加载的 + ClassLoader classLoader = serviceClass.getClassLoader(); + // 获取到该接口所有的interface + Class[] interfaces = {serviceClass}; + // jdk代理必须的handler,代理对象的方法执行就会调用这里的invoke方法。自动传入调用的方法 + 方法参数 + Object proxy = Proxy.newProxyInstance(classLoader, interfaces, new InvocationHandler(serviceClass)); + // 返回代理对象 + return (T) proxy; + } +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/proxy/InvocationHandler.java b/connect-core/src/main/java/org/needcoke/rpc/proxy/InvocationHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..d37918c25acd0adb2dfa9cf714badb9ae77b72bd --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/proxy/InvocationHandler.java @@ -0,0 +1,34 @@ +package org.needcoke.rpc.proxy; + +import cn.hutool.core.util.StrUtil; +import org.needcoke.rpc.annotation.RpcClient; +import org.needcoke.rpc.invoker.InvokeResult; +import org.needcoke.rpc.net.ConnectorFactory; +import org.needcoke.rpc.utils.SpringContextUtils; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +public class InvocationHandler implements java.lang.reflect.InvocationHandler { + private Class serviceClass; + + public InvocationHandler(Class serviceClass) { + this.serviceClass = serviceClass; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + RpcClient rpcClient = serviceClass.getAnnotation(RpcClient.class); + ConnectorFactory connectorFactory = SpringContextUtils.getBean(ConnectorFactory.class); + if (StrUtil.isEmpty(rpcClient.serviceId())) { + throw new RuntimeException(serviceClass.getName() + " need serviceId"); + } + Map params = new HashMap<>(); + for (int i = 1; i <= args.length; i++) { + params.put("arg" + i, args[i-1]); + } + InvokeResult result = connectorFactory.connector(rpcClient.serviceId()).execute(rpcClient.beanName(), method.getName(), params); + return result.getBody(); + } +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/proxy/RpcClient.java b/connect-core/src/main/java/org/needcoke/rpc/proxy/RpcClient.java new file mode 100644 index 0000000000000000000000000000000000000000..ffd34763a70f8e412e62adce3fd8676e5efe1e89 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/proxy/RpcClient.java @@ -0,0 +1,4 @@ +package org.needcoke.rpc.proxy; + +public interface RpcClient { +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/proxy/RpcClientFactoryBean.java b/connect-core/src/main/java/org/needcoke/rpc/proxy/RpcClientFactoryBean.java new file mode 100644 index 0000000000000000000000000000000000000000..38bd9e021806201ea3c23a945c95949e2e97b754 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/proxy/RpcClientFactoryBean.java @@ -0,0 +1,100 @@ +package org.needcoke.rpc.proxy; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.util.Assert; + +import java.util.Objects; + +public class RpcClientFactoryBean implements FactoryBean, InitializingBean, ApplicationContextAware { + + /** + * beanName + */ + private String name; + + /** + * object类型 + */ + private Class type; + + private ApplicationContext applicationContext; + + @Override + public void afterPropertiesSet() throws Exception { + Assert.hasText(name, "Name must be set"); + } + + @Override + public void setApplicationContext(ApplicationContext context) throws BeansException { + applicationContext = context; + } + + @Override + public Object getObject() throws Exception { + // 创建一个代理对象并返回 + Object proxyService = ClientProxyCreateFactory.getProxyService(type); + return proxyService; + } + + @Override + public Class getObjectType() { + return type; + } + + @Override + public boolean isSingleton() { + return true; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Class getType() { + return type; + } + + public void setType(Class type) { + this.type = type; + } + + public ApplicationContext getApplicationContext() { + return applicationContext; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RpcClientFactoryBean that = (RpcClientFactoryBean) o; + return Objects.equals(name, that.name) && + Objects.equals(type, that.type) && + Objects.equals(applicationContext, that.applicationContext); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, applicationContext); + } + + @Override + public String toString() { + return "SimpleRpcClientFactoryBean{" + + "name='" + name + '\'' + + ", type=" + type + + ", applicationContext=" + applicationContext + + '}'; + } +} \ No newline at end of file diff --git a/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java b/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java index 615e1321cffba18c627a118d3ae9582985d97c52..1b3941113707128c0cd46db2723ccb1c983ee975 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java +++ b/connect-core/src/main/java/org/needcoke/rpc/utils/ConnectUtil.java @@ -5,6 +5,7 @@ import com.ejlchina.okhttps.HttpResult; import com.ejlchina.okhttps.SHttpTask; import com.ejlchina.okhttps.jackson.JacksonMsgConvertor; import lombok.extern.slf4j.Slf4j; +import org.connect.rpc.link.tracking.util.TrackingUtil; import org.needcoke.rpc.common.constant.ConnectConstant; import org.needcoke.rpc.common.enums.HttpContentTypeEnum; import org.needcoke.rpc.invoker.ConnectInvoker; @@ -14,15 +15,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; -import org.springframework.web.context.request.RequestAttributes; -import org.springframework.web.context.request.RequestContextHolder; -import org.springframework.web.context.request.ServletRequestAttributes; -import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.PostConstruct; -import javax.servlet.http.HttpServletRequest; -import java.util.Enumeration; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -72,20 +68,28 @@ public class ConnectUtil { public static final AtomicInteger requestIdMaker = new AtomicInteger(); - public static final Map requestMap = new ConcurrentHashMap(); + private static final Map requestMap = new ConcurrentHashMap(); public static void putRequestMap(InvokeResult result){ - requestMap.put(requestIdMaker.addAndGet(1),result); + requestMap.put(TrackingUtil.getRequestId(),result); } - public static ConcurrentHashMap threadMap = new ConcurrentHashMap<>(); + private static Map threadMap = new ConcurrentHashMap<>(); - public static void putRequestMap(Integer requestId,InvokeResult result){ + public static void putRequestMap(String requestId,InvokeResult result){ requestMap.put(requestId,result); } - public static InvokeResult getFromRequestMap(Integer key){ - return requestMap.get(key); + public static InvokeResult getFromRequestMap(String key){ + return requestMap.remove(key); + } + + public static void putThreadMap(String requestId,Thread thread){ + threadMap.put(requestId,thread); + } + + public static Thread getFromThreadMap(String requestId){ + return threadMap.remove(requestId); } /** @@ -101,25 +105,7 @@ public class ConnectUtil { Map params) { List instances = discoveryClient.getInstances(serviceId); ServiceInstance instance = loadBalance.choose(serviceId,instances); - InvokeResult result = connectInvoker.execute(instance, beanName, methodName, params); + InvokeResult result = connectInvoker.execute(null,instance, beanName, methodName, params); return result; } - - - public static Integer getCokeServerPort(ServiceInstance instance){ - RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes(); - HttpServletRequest request = ((ServletRequestAttributes) requestAttributes).getRequest(); - SHttpTask sHttpTask = HTTP.builder().addMsgConvertor(new JacksonMsgConvertor()).build() - .sync(instance.getUri() + ConnectConstant.COKE_PORT_RELATIVE_PATH) - .bodyType(HttpContentTypeEnum.JSON.getValue()); - Enumeration headerNames = request.getHeaderNames(); - while (headerNames.hasMoreElements()) { - String nextElement = headerNames.nextElement(); - String header = request.getHeader(nextElement); - sHttpTask.addHeader(nextElement,header); - } - HttpResult result = sHttpTask - .get(); - return result.getBody().toBean(Integer.class); - } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/utils/GsonUtil.java b/connect-core/src/main/java/org/needcoke/rpc/utils/GsonUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..9c802a4f87880c9c16cc60a92c18c476e9cbff47 --- /dev/null +++ b/connect-core/src/main/java/org/needcoke/rpc/utils/GsonUtil.java @@ -0,0 +1,6 @@ +package org.needcoke.rpc.utils; + +public class GsonUtil { + + +} diff --git a/connect-core/src/main/java/org/needcoke/rpc/utils/SpringContextUtils.java b/connect-core/src/main/java/org/needcoke/rpc/utils/SpringContextUtils.java index ac4eaec59244ed7e32b6ba46445e694aed3c50b7..9bc307e9a5a59be03bdcd8f5327b1fae731ffbef 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/utils/SpringContextUtils.java +++ b/connect-core/src/main/java/org/needcoke/rpc/utils/SpringContextUtils.java @@ -24,7 +24,6 @@ public class SpringContextUtils { @Resource private ApplicationContext context; - @Resource(name = "beanNameMethodMap") private Map beanNameMethodMap; diff --git a/connect-core/src/main/resources/META-INF/spring.factories b/connect-core/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 18bbc5f2f2c8d96d993c3e72d1c8d789c06ee43a..0000000000000000000000000000000000000000 --- a/connect-core/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,2 +0,0 @@ -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ - org.needcoke.rpc.config.CokeConnectContextInitializer \ No newline at end of file diff --git a/register-eureka/pom.xml b/connect-fuse/pom.xml similarity index 36% rename from register-eureka/pom.xml rename to connect-fuse/pom.xml index 2679ee9b7a1599fc6f5994b36039211f55773128..5407b555470b05fc38514978c79e3e3eee18f19f 100644 --- a/register-eureka/pom.xml +++ b/connect-fuse/pom.xml @@ -9,11 +9,32 @@ 4.0.0 - register-eureka + connect-fuse - 11 - 11 + 8 + 8 + + + org.springframework.boot + spring-boot-starter-web + 2.3.2.RELEASE + provided + + + org.projectlombok + lombok + ${lombok-version} + provided + + + com.google.guava + guava + 31.0.1-jre + compile + + + \ No newline at end of file diff --git a/connect-fuse/src/main/java/org/needcoke/rpc/Fuse.java b/connect-fuse/src/main/java/org/needcoke/rpc/Fuse.java new file mode 100644 index 0000000000000000000000000000000000000000..9168f17c969eadff47a3b3e302b9f98a3756b77e --- /dev/null +++ b/connect-fuse/src/main/java/org/needcoke/rpc/Fuse.java @@ -0,0 +1,103 @@ +package org.needcoke.rpc; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j +public class Fuse extends Thread { + + private final ReentrantLock lock = new ReentrantLock(); + + private long timeout; + + private TimeUnit unit; + + /* 锁字段 */ + private final Object lockField; + + /* 超时需要唤醒的线程 */ + private final Thread unParkThread; + + private static final Map unParkThreadMap = new ConcurrentHashMap<>(); + private static final Map unParkMap = new ConcurrentHashMap<>(); + + private static ArrayBlockingQueue blockQueue; + + public static void setBlockQueue(ArrayBlockingQueue blockQueue) { + Fuse.blockQueue = blockQueue; + } + + public Fuse(long timeout, TimeUnit unit, Object lockField) { + this.timeout = timeout; + this.unit = unit; + this.lockField = lockField; + this.unParkThread = Thread.currentThread(); + FuseContext.lockPool.put(lockField, lock); + unParkMap.put(lockField, true); + unParkThreadMap.put(lockField, unParkThread); + } + + public Fuse(long timeout, Object lockField) { + this.timeout = timeout; + this.unit = TimeUnit.MILLISECONDS; + this.lockField = lockField; + this.unParkThread = Thread.currentThread(); + FuseContext.lockPool.put(lockField, lock); + unParkMap.put(lockField, true); + unParkThreadMap.put(lockField, unParkThread); + } + + + @Override + public void run() { + try { + unit.sleep(timeout); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (lock.tryLock()) { + lock.lock(); + try { + if (unParkMap.containsKey(lockField)) { + LockSupport.unpark(unParkThread); + log.warn("coke connect fuse lockField = {},fuseTimeOut = {}", lockField, timeout); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + FuseContext.lockPool.remove(lockField); + unParkMap.remove(lockField); + unParkThreadMap.remove(lockField); + } + } + + } + + public static boolean unPark(Object lockField) { + Lock tLock = FuseContext.lockPool.get(lockField); + if(tLock.tryLock()) { + tLock.lock(); + try { + if (unParkMap.containsKey(lockField)) { + LockSupport.unpark(unParkThreadMap.get(lockField)); + unParkMap.remove(lockField); + return true; + } + }catch (Exception e){ + throw new RuntimeException(e); + }finally { + tLock.unlock(); + FuseContext.lockPool.remove(lockField); + } + } + return false; + } +} diff --git a/connect-fuse/src/main/java/org/needcoke/rpc/FuseContext.java b/connect-fuse/src/main/java/org/needcoke/rpc/FuseContext.java new file mode 100644 index 0000000000000000000000000000000000000000..610d3f5e7da2128c6555a12d928a7054830fbf06 --- /dev/null +++ b/connect-fuse/src/main/java/org/needcoke/rpc/FuseContext.java @@ -0,0 +1,18 @@ +package org.needcoke.rpc; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class FuseContext { + + /** + * 拒绝策略锁池 + */ + public static final Map lockPool = new ConcurrentHashMap<>(); + + public static AtomicInteger fuse2StartNumber = new AtomicInteger(0); + + public static AtomicInteger fuse2EndNumber = new AtomicInteger(0); +} diff --git a/connect-fuse/src/main/java/org/needcoke/rpc/FuseReportDTO.java b/connect-fuse/src/main/java/org/needcoke/rpc/FuseReportDTO.java new file mode 100644 index 0000000000000000000000000000000000000000..dcee8d773be43fb7440488f21514f3e6c2d828b2 --- /dev/null +++ b/connect-fuse/src/main/java/org/needcoke/rpc/FuseReportDTO.java @@ -0,0 +1,42 @@ +package org.needcoke.rpc; + +import lombok.Data; +import org.needcoke.rpc.config.FuseConfig; + +import java.io.Serializable; + +@Data +public class FuseReportDTO implements Serializable { + + /** + * 未释放的锁的数量 + */ + private int unParkLockNumber; + + /** + * 阻塞住的线程数量 + */ + private int parkThreadNumber; + + /** + * 未解锁的请求数量 + */ + private int unParkNumber; + + /** + * 关于熔断的一些基础配置 + */ + private FuseConfig fuseConfig; + + /** + * 被熔断掉的任务数量 + */ + private int fuseRequestNumber; + + /** + * 实时线程池线程数 + */ + private int poolSize; + + +} diff --git a/connect-fuse/src/main/java/org/needcoke/rpc/FuseThreadPool.java b/connect-fuse/src/main/java/org/needcoke/rpc/FuseThreadPool.java new file mode 100644 index 0000000000000000000000000000000000000000..abc038b03500e512726a2fb87b25322378f42f4b --- /dev/null +++ b/connect-fuse/src/main/java/org/needcoke/rpc/FuseThreadPool.java @@ -0,0 +1,74 @@ +package org.needcoke.rpc; + + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.needcoke.rpc.config.FuseConfig; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.concurrent.*; + +@Component +public class FuseThreadPool { + + @Resource + private FuseConfig fuseConfig; + + + /** + * 自定义线程名称,方便的出错的时候溯源 + */ + private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("coke-connect-Fuse-pool-%d").build(); + + + /** + * corePoolSize 线程池核心池的大小 + * maximumPoolSize 线程池中允许的最大线程数量 + * keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间 + * unit keepAliveTime 的时间单位 + * workQueue 用来储存等待执行任务的队列 + * threadFactory 创建线程的工厂类 + * handler 拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理 + */ + private ExecutorService service = null; + + @PostConstruct + public void init() { + ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(fuseConfig.getPoolCapacity()); + Fuse.setBlockQueue(blockingQueue); + service = new ThreadPoolExecutor( + fuseConfig.getCoreThreadPoolSize(), + fuseConfig.getMaximumPoolSize(), + fuseConfig.getKeepAliveTime(), + TimeUnit.MILLISECONDS, + blockingQueue, + namedThreadFactory, + new ThreadPoolExecutor.AbortPolicy() + ); + } + + /** + * 获取线程池 + * @return 线程池 + */ + public ExecutorService getExecutorService() { + return service; + } + + /** + * 使用线程池创建线程并异步执行任务 + * @param r 任务 + */ + public void newTask(Runnable r) { + service.execute(r); + } + + /** + * 使用线程池创建线程并异步执行任务 + * @param r 任务 + */ + public Future newTask(Callable r){ + return service.submit(r); + } +} diff --git a/connect-fuse/src/main/java/org/needcoke/rpc/config/FuseConfig.java b/connect-fuse/src/main/java/org/needcoke/rpc/config/FuseConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..2b5b6357d750d4d86d6038920f423a5772b41e52 --- /dev/null +++ b/connect-fuse/src/main/java/org/needcoke/rpc/config/FuseConfig.java @@ -0,0 +1,25 @@ +package org.needcoke.rpc.config; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Getter +@Configuration +public class FuseConfig { + + @Value("${coke.fuse.timeout:1000}") + private long fuseTimeOut; + + @Value("${coke.fuse.pool.coreThreadPoolSize:2}") + private int coreThreadPoolSize; + + @Value("${coke.fuse.pool.maximumPoolSize:4}") + private int maximumPoolSize; + + @Value("${coke.fuse.pool.keepAliveTime:2000}") + private long keepAliveTime; + + @Value("${coke.fuse.pool.poolCapacity:9999}") + private int poolCapacity; +} diff --git a/connect-fuse/src/main/java/org/needcoke/rpc/controller/FuseController.java b/connect-fuse/src/main/java/org/needcoke/rpc/controller/FuseController.java new file mode 100644 index 0000000000000000000000000000000000000000..85acf544363b3096ba6182f877a576220d1cff4e --- /dev/null +++ b/connect-fuse/src/main/java/org/needcoke/rpc/controller/FuseController.java @@ -0,0 +1,72 @@ +package org.needcoke.rpc.controller; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.needcoke.rpc.FuseContext; +import org.needcoke.rpc.FuseThreadPool; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; + +/** + * @author Gilgamesh + * @date 2022/4/2 + */ + +@RestController +@Slf4j +@RequestMapping("coke/connect/api") +@RequiredArgsConstructor +public class FuseController { + + private final FuseThreadPool threadPool; + + /** + * 熔断监控 + */ + @GetMapping(value = "/fuse/monitoring") + public Map fuseMonitoring(){ + Map map = new HashMap<>(); + ThreadPoolExecutor pool = (ThreadPoolExecutor)threadPool.getExecutorService(); + int poolSize = pool.getPoolSize(); + long taskCount = pool.getTaskCount(); + int activeCount = pool.getActiveCount(); + long completedTaskCount = pool.getCompletedTaskCount(); + int corePoolSize = pool.getCorePoolSize(); + int largestPoolSize = pool.getLargestPoolSize(); + int maximumPoolSize = pool.getMaximumPoolSize(); + ThreadFactory threadFactory = pool.getThreadFactory(); + String threadFactoryName = threadFactory.getClass().getName(); + RejectedExecutionHandler rejectedExecutionHandler = pool.getRejectedExecutionHandler(); + String rejectedExecutionHandlerName = rejectedExecutionHandler.getClass().getName(); + BlockingQueue queue = pool.getQueue(); + int remainingCapacity = queue.remainingCapacity(); + map.put("poolSize",""+poolSize); + map.put("taskCount",""+taskCount); + map.put("activeCount",""+activeCount); + map.put("remainingCapacity",""+remainingCapacity); + map.put("completedTaskCount",""+completedTaskCount); + + map.put("corePoolSize",""+corePoolSize); + map.put("largestPoolSize",""+largestPoolSize); + map.put("maximumPoolSize",""+maximumPoolSize); + map.put("threadFactoryName",threadFactoryName); + map.put("rejectedExecutionHandlerName",rejectedExecutionHandlerName); + return map; + } + + + @GetMapping(value = "/fuse2/monitoring") + public Map fuse2Monitoring(){ + Map map = new HashMap<>(); + map.put("open num",""+ FuseContext.fuse2StartNumber.get()); + map.put("close num",""+ FuseContext.fuse2EndNumber.get()); + return map; + } +} + + diff --git a/register-nacos/pom.xml b/connect-link-tracking/pom.xml similarity index 34% rename from register-nacos/pom.xml rename to connect-link-tracking/pom.xml index 79567d1d91df1280d472790a49326d3cc23f9377..acb3f7036e7c4f46d66117b5c71426938f7297e7 100644 --- a/register-nacos/pom.xml +++ b/connect-link-tracking/pom.xml @@ -9,49 +9,44 @@ 4.0.0 - register-nacos + connect-link-tracking - 11 - 11 + 8 + 8 - org.needcoke - connect-core - abandon + org.projectlombok + lombok + 1.18.24 - org.springframework.cloud - spring-cloud-starter-parent - ${spring-cloud-version} - pom + com.alibaba + transmittable-thread-local + 2.12.1 - com.alibaba.cloud - spring-cloud-starter-alibaba-nacos-discovery - 2.2.5.RC1 + org.apache.tomcat.embed + tomcat-embed-core + 9.0.63 + compile - - - - - - - org.springframework.cloud - spring-cloud-dependencies - ${spring-cloud-version} - - - com.alibaba.cloud - spring-cloud-alibaba-dependencies - 2.2.3.RELEASE - - - - + + cn.hutool + hutool-all + 5.8.0 + compile + + + com.google.code.gson + gson + 2.9.0 + compile + + \ No newline at end of file diff --git a/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/common/CommonConstant.java b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/common/CommonConstant.java new file mode 100644 index 0000000000000000000000000000000000000000..dd40f85b1897072ace958f95d71e03fdd573b0f6 --- /dev/null +++ b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/common/CommonConstant.java @@ -0,0 +1,6 @@ +package org.connect.rpc.link.tracking.common; + +public interface CommonConstant { + + String COKE_REQUEST_ID_HEADER_ID_NAME = "COKE-LINK-TRACKING-INFO"; +} diff --git a/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/config/LinkTrackingContextHolder.java b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/config/LinkTrackingContextHolder.java new file mode 100644 index 0000000000000000000000000000000000000000..9c8c15a4f080db9fbd0a080bbfe990e74d66b572 --- /dev/null +++ b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/config/LinkTrackingContextHolder.java @@ -0,0 +1,37 @@ +package org.connect.rpc.link.tracking.config; + +import com.alibaba.ttl.TransmittableThreadLocal; +import lombok.experimental.UtilityClass; +import org.connect.rpc.link.tracking.export.ExportHandle; +import org.connect.rpc.link.tracking.net.LinkTracking; + +@UtilityClass +public class LinkTrackingContextHolder { + + private final ExportHandle exportHandle = new ExportHandle(); + + private final TransmittableThreadLocal THREAD_LOCAL_LINK_TRACKING = new TransmittableThreadLocal<>(); + + public void setLinkTracking(LinkTracking tracking) { + THREAD_LOCAL_LINK_TRACKING.set(tracking); + exportHandle.in(); + } + + public LinkTracking getLinkTracking() { + return THREAD_LOCAL_LINK_TRACKING.get(); + } + + public void clear() { + THREAD_LOCAL_LINK_TRACKING.remove(); + exportHandle.out(); + } + + + public boolean isEmpty() { + return THREAD_LOCAL_LINK_TRACKING.get() == null; + } + + public boolean isNotEmpty() { + return THREAD_LOCAL_LINK_TRACKING.get() != null; + } +} diff --git a/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/export/ExportHandle.java b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/export/ExportHandle.java new file mode 100644 index 0000000000000000000000000000000000000000..fb3ddff7564ac200e532c1a505fc640f445c769e --- /dev/null +++ b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/export/ExportHandle.java @@ -0,0 +1,16 @@ +package org.connect.rpc.link.tracking.export; + +public class ExportHandle { + + public void in(){ + + } + + public void out(){ + + } + + public void exception(){ + + } +} diff --git a/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/net/LinkTracking.java b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/net/LinkTracking.java new file mode 100644 index 0000000000000000000000000000000000000000..db4d4e63b0de80710ea6e24c7faaf2276bf8fecf --- /dev/null +++ b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/net/LinkTracking.java @@ -0,0 +1,57 @@ +package org.connect.rpc.link.tracking.net; + +import cn.hutool.core.collection.CollUtil; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +@Data +@EqualsAndHashCode +@Accessors(chain = true) +public class LinkTracking { + private String ip; + + private int port; + + private Map metaData; + + private String requestId; + + private Integer index; + + private String serviceId; + + private long startTime; + + private static final AtomicLong requestIdMaker = new AtomicLong(1); + + public LinkTracking(int port) { + this.port = port; + changeIp(); + requestId = ""+requestIdMaker.getAndAdd(1); + this.index = 1; + } + + public void changeIp(){ + try { + InetAddress localHost = InetAddress.getLocalHost(); + String ip = localHost.getHostAddress(); + this.ip = ip; + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + + public void addMataData(String key,String value){ + if(CollUtil.isEmpty(metaData)){ + this.metaData = new HashMap<>(); + } + metaData.put(key,value); + } +} diff --git a/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/util/TrackingUtil.java b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/util/TrackingUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..fc8da878486bce5431eab9be7e41c995dd3a6327 --- /dev/null +++ b/connect-link-tracking/src/main/java/org/connect/rpc/link/tracking/util/TrackingUtil.java @@ -0,0 +1,50 @@ +package org.connect.rpc.link.tracking.util; + +import cn.hutool.core.util.StrUtil; +import com.google.gson.Gson; +import lombok.experimental.UtilityClass; +import org.connect.rpc.link.tracking.common.CommonConstant; +import org.connect.rpc.link.tracking.config.LinkTrackingContextHolder; +import org.connect.rpc.link.tracking.net.LinkTracking; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +@UtilityClass +public class TrackingUtil { + + private final Gson gson = new Gson(); + public void preHttp(HttpServletRequest request, HttpServletResponse response, Object handler, int port) { + String cokeRequestIdJson = request.getHeader(CommonConstant.COKE_REQUEST_ID_HEADER_ID_NAME); + if (StrUtil.isEmpty(cokeRequestIdJson)) { + LinkTracking linkTracking = new LinkTracking(port); + linkTracking.setIndex(1); + linkTracking.setStartTime(System.currentTimeMillis()); + LinkTrackingContextHolder.setLinkTracking(linkTracking); + } else { + LinkTracking linkTracking = gson.fromJson(cokeRequestIdJson, LinkTracking.class); + linkTracking.setIndex(linkTracking.getIndex() + 1); + linkTracking.changeIp(); + linkTracking.setPort(port); + LinkTrackingContextHolder.setLinkTracking(linkTracking); + } + } + + public String headerKey() { + return CommonConstant.COKE_REQUEST_ID_HEADER_ID_NAME; + } + + public String headerValue() { + LinkTracking linkTracking = LinkTrackingContextHolder.getLinkTracking(); + linkTracking.setIndex(linkTracking.getIndex()); + return gson.toJson(linkTracking); + } + + public String getRequestId() { + return LinkTrackingContextHolder.getLinkTracking().getRequestId(); + } + + public String linkTrackingJsonStr() { + return gson.toJson(LinkTrackingContextHolder.getLinkTracking()); + } +} diff --git a/bussiness-a/pom.xml b/connect-server-netty/pom.xml similarity index 76% rename from bussiness-a/pom.xml rename to connect-server-netty/pom.xml index 3f32b917c25a22965a441a6b0eac8e485452e180..fae08d79a83560a2cf83f6ef1319655b49b2060a 100644 --- a/bussiness-a/pom.xml +++ b/connect-server-netty/pom.xml @@ -9,7 +9,7 @@ 4.0.0 - bussiness-a + connect-server-netty 8 @@ -21,6 +21,13 @@ org.needcoke connect-core abandon + provided + + + + io.netty + netty-all + 4.1.76.Final diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/client/NettyClient.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/client/NettyClient.java new file mode 100644 index 0000000000000000000000000000000000000000..bdb7993dce0b7894b1680c6da82ed91c7fbdcc81 --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/client/NettyClient.java @@ -0,0 +1,47 @@ +package org.needcoke.rpc.netty.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.needcoke.rpc.codec.CokeRequest; +import org.needcoke.rpc.netty.codec.RpcDecoder; +import org.needcoke.rpc.netty.codec.RpcEncoder; + +public class NettyClient { + + private Channel channel; + + private String ip; + + private int port; + + public NettyClient(String ip, int port) { + this.ip = ip; + this.port = port; + } + + public Channel start() throws InterruptedException { + + final EventLoopGroup group = new NioEventLoopGroup(); + + Bootstrap b = new Bootstrap(); + b.group(group).channel(NioSocketChannel.class)// 使用NioSocketChannel来作为连接用的channel类 + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500) + .handler(new ChannelInitializer() { // 绑定连接初始化器 + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new RpcEncoder(CokeRequest.class)); //编码request + pipeline.addLast(new RpcDecoder(CokeRequest.class)); //解码response + pipeline.addLast(new SenderHandlerAdapter()); //客户端处理类 + + } + }); + //发起异步连接请求,绑定连接端口和host信息 + final ChannelFuture future = b.connect(ip, port).sync(); + this.channel = future.channel(); + return this.channel; + } +} diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/client/SenderHandlerAdapter.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/client/SenderHandlerAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..981bbc439fb9ebe8c90bb64930e091b0105fc6ad --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/client/SenderHandlerAdapter.java @@ -0,0 +1,35 @@ +package org.needcoke.rpc.netty.client; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.needcoke.rpc.codec.CokeRequest; +import org.needcoke.rpc.netty.processor.ReadMessageClientProcessor; + +public class SenderHandlerAdapter extends SimpleChannelInboundHandler { + + private final ReadMessageClientProcessor clientProcessor; + + public SenderHandlerAdapter(){ + this.clientProcessor = new ReadMessageClientProcessor(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, CokeRequest request) throws Exception { + clientProcessor.channelRead(ctx,request); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + super.userEventTriggered(ctx, evt); + } +} diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/codec/RpcDecoder.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/codec/RpcDecoder.java new file mode 100644 index 0000000000000000000000000000000000000000..36a9dadb1289fda888e52a2e3049f70f295e7a18 --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/codec/RpcDecoder.java @@ -0,0 +1,37 @@ +package org.needcoke.rpc.netty.codec; + +import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class RpcDecoder extends ByteToMessageDecoder { + + //目标对象类型进行解码 + private Class target; + + public RpcDecoder(Class target) { + this.target = target; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (in.readableBytes() < 4) { //不够长度丢弃 + return; + } + in.markReaderIndex(); //标记一下当前的readIndex的位置 + int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4 + + if (in.readableBytes() < dataLength) { //读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方 + in.resetReaderIndex(); + return; + } + byte[] data = new byte[dataLength]; + in.readBytes(data); + + Object obj = JSON.parseObject(data, target); //将byte数据转化为我们需要的对象 + out.add(obj); + } +} diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/codec/RpcEncoder.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/codec/RpcEncoder.java new file mode 100644 index 0000000000000000000000000000000000000000..a020fd79f786ba053f6a4f0a99a5711ee419ef4b --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/codec/RpcEncoder.java @@ -0,0 +1,25 @@ +package org.needcoke.rpc.netty.codec; + +import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +public class RpcEncoder extends MessageToByteEncoder { + + //目标对象类型进行编码 + private Class target; + + public RpcEncoder(Class target) { + this.target = target; + } + + @Override + protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + if (target.isInstance(msg)) { + byte[] data = JSON.toJSONBytes(msg); //使用fastJson将对象转换为byte + out.writeInt(data.length); //先将消息长度写入,也就是消息头 + out.writeBytes(data); //消息体中包含我们要发送的数据 + } + } +} diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/invoker/NettyInvoker.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/invoker/NettyInvoker.java new file mode 100644 index 0000000000000000000000000000000000000000..cbadc9993f3e1a5e111465188e91ac654643fd42 --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/invoker/NettyInvoker.java @@ -0,0 +1,88 @@ +package org.needcoke.rpc.netty.invoker; + +import cn.hutool.core.date.DateUtil; +import io.netty.channel.Channel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.connect.rpc.link.tracking.config.LinkTrackingContextHolder; +import org.connect.rpc.link.tracking.util.TrackingUtil; +import org.needcoke.rpc.Fuse; +import org.needcoke.rpc.codec.CokeRequest; +import org.needcoke.rpc.common.constant.ConnectConstant; +import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; +import org.needcoke.rpc.common.exception.CokeConnectException; +import org.needcoke.rpc.invoker.ConnectInvoker; +import org.needcoke.rpc.invoker.InvokeResult; +import org.needcoke.rpc.invoker.OkHttpsInvoker; +import org.needcoke.rpc.net.Connector; +import org.needcoke.rpc.netty.client.NettyClient; +import org.needcoke.rpc.utils.ConnectUtil; +import org.springframework.cloud.client.ServiceInstance; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.LockSupport; + +@Slf4j +@NoArgsConstructor +public class NettyInvoker extends ConnectInvoker { + + private final Map clientMap = new ConcurrentHashMap<>(); + + private final Map channelMap = new ConcurrentHashMap<>(); + @Override + public InvokeResult execute(Connector connector, ServiceInstance instance, String beanName, String methodName, Map params) { + InvokeResult res = runDefaultExecute(connector, instance, beanName, methodName, params); + if(null != res){ + return res; + } + String uri = instance.getHost() + ConnectConstant.COLON + instance.getPort(); + Integer serverPort = connector.getServerPort(instance); + if (0 == serverPort) { + throw new CokeConnectException(ConnectionExceptionEnum.REMOTE_SERVICE_DOES_NOT_OPEN_THE_COKE_SERVICE_PORT); + } + if (!channelMap.containsKey(uri)) { + NettyClient nettyClient = new NettyClient(instance.getHost(), serverPort); + clientMap.put(uri, nettyClient); + try { + Channel channel = nettyClient.start(); + channelMap.put(uri, channel); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + Channel channel = channelMap.get(uri); + CokeRequest request = new CokeRequest().setBeanName(beanName) + .setMethodName(methodName) + .setParams(params) + .addHeader(TrackingUtil.headerKey(), TrackingUtil.headerValue()); + byte[] bytes = request.toBytes(); + InvokeResult tmp = new InvokeResult(); + ConnectUtil.putRequestMap(tmp); + try { + channel.writeAndFlush(request); + } catch (Exception e) { + //一般是channel close失效了 + log.error(e.getMessage()); + try { + channel = clientMap.get(uri).start(); + } catch (InterruptedException ex) { + if (null == connector.getHttpInvoker()) { + connector.setHttpInvoker(new OkHttpsInvoker()); + } + return connector.compensationExecute(instance,beanName,methodName,params); + } + channelMap.put(uri,channel); + return execute(connector,instance,beanName,methodName,params); + } + Fuse fuse = new Fuse(fuseConfig.getFuseTimeOut(), TrackingUtil.getRequestId()); + fuseThreadPool.newTask(fuse); + LockSupport.park(); + InvokeResult result = ConnectUtil.getFromRequestMap(TrackingUtil.getRequestId()); + long start = LinkTrackingContextHolder.getLinkTracking().getStartTime(); + long end = DateUtil.current(); + log.debug("requestId = {} , start = {} , end = {} ,cost = {}", TrackingUtil.getRequestId(), start, end, end - start); + result.setTime(end - start); + return result; + } +} diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/processor/ReadMessageClientProcessor.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/processor/ReadMessageClientProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..42bbeb4b33348b02eacd002951d72b787deaed7f --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/processor/ReadMessageClientProcessor.java @@ -0,0 +1,24 @@ +package org.needcoke.rpc.netty.processor; + +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; +import org.connect.rpc.link.tracking.util.TrackingUtil; +import org.needcoke.rpc.Fuse; +import org.needcoke.rpc.codec.CokeRequest; +import org.needcoke.rpc.common.enums.ConnectRequestEnum; +import org.needcoke.rpc.utils.ConnectUtil; + +@Slf4j +public class ReadMessageClientProcessor implements ReadMessageProcessor { + @Override + public void channelRead(ChannelHandlerContext ctx, CokeRequest request) { + if (ConnectRequestEnum.INTERNAL_RESPONSE == request.getRequestType()) { + log.debug("netty client receive back linkTracking = {} , request json = {}", + TrackingUtil.linkTrackingJsonStr(), new String(request.toBytes())); + boolean bool = Fuse.unPark(request.getCokeRequestId()); + if(bool) { + ConnectUtil.putRequestMap(request.getCokeRequestId(), request.getResult()); + } + } + } +} diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/processor/ReadMessageProcessor.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/processor/ReadMessageProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..3935a00da5c20b220908ca0504bc12c1a88406d5 --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/processor/ReadMessageProcessor.java @@ -0,0 +1,9 @@ +package org.needcoke.rpc.netty.processor; + +import io.netty.channel.ChannelHandlerContext; +import org.needcoke.rpc.codec.CokeRequest; + +public interface ReadMessageProcessor { + + void channelRead(ChannelHandlerContext ctx, CokeRequest request); +} diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/processor/ReadMessageServerProcessor.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/processor/ReadMessageServerProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..d2cbe6d80be8d95da6a65d126a10d9cd8a5b91c9 --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/processor/ReadMessageServerProcessor.java @@ -0,0 +1,59 @@ +package org.needcoke.rpc.netty.processor; + +import cn.hutool.core.collection.CollUtil; +import com.alibaba.fastjson.JSONObject; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; +import org.connect.rpc.link.tracking.util.TrackingUtil; +import org.needcoke.rpc.codec.CokeRequest; +import org.needcoke.rpc.common.constant.ConnectConstant; +import org.needcoke.rpc.common.enums.ConnectRequestEnum; +import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; +import org.needcoke.rpc.common.exception.CokeConnectException; +import org.needcoke.rpc.invoker.InvokeResult; +import org.needcoke.rpc.utils.SpringContextUtils; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; + +@Slf4j +public class ReadMessageServerProcessor implements ReadMessageProcessor{ + + @Override + public void channelRead(ChannelHandlerContext ctx, CokeRequest request) { + if(ConnectRequestEnum.INTERNAL_REQUEST == request.getRequestType()){ + String beanName = request.getBeanName(); + String methodName = request.getMethodName(); + Map params = request.getParams(); + log.debug("execute netty linkTracking = {} , -- beanName : {} , methodName : {} , param : {}", + TrackingUtil.linkTrackingJsonStr(), beanName, methodName, JSONObject.toJSONString(params)); + Method method = SpringContextUtils.getMethod(beanName, methodName); + if (null == method) { + log.error("beanName {} , methodName {} , linkTracking = {}", beanName, methodName, TrackingUtil.linkTrackingJsonStr()); + throw new CokeConnectException(ConnectionExceptionEnum.BEAN_WITHOUT_METHOD); + } + Object bean = SpringContextUtils.getBean(beanName); + try { + Object invoke; + if (CollUtil.isEmpty(params)) { + invoke = method.invoke(bean); + } else { + invoke = method.invoke(bean, params.values().toArray()); + } + InvokeResult invokeResult = new InvokeResult().setBody(invoke).setStatus(200).setTime(30L); + ctx.writeAndFlush(request.setRequestType(ConnectRequestEnum.INTERNAL_RESPONSE) + .setResult(invokeResult) + .setCokeRequestId(TrackingUtil.getRequestId())); + } catch (Exception e) { + log.error(ConnectionExceptionEnum.INVOKE_METHOD_ERROR.logStatement(ConnectConstant.EXECUTE_RELATIVE_PATH)); + if(e instanceof InvocationTargetException){ + ((InvocationTargetException) e).getTargetException().printStackTrace(); + } + throw new CokeConnectException(ConnectionExceptionEnum.INVOKE_METHOD_ERROR, e); + } + } + } + + +} diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/server/NettyServer.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/server/NettyServer.java new file mode 100644 index 0000000000000000000000000000000000000000..1b9548af914dcbca1671e269fbe0e8d3e8c06f74 --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/server/NettyServer.java @@ -0,0 +1,68 @@ +package org.needcoke.rpc.netty.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.needcoke.rpc.codec.CokeRequest; +import org.needcoke.rpc.config.ServerConfig; +import org.needcoke.rpc.netty.codec.RpcDecoder; +import org.needcoke.rpc.netty.codec.RpcEncoder; +import org.needcoke.rpc.server.ConnectionServer; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +@Slf4j +public class NettyServer extends Thread implements ConnectionServer { + + @Resource + private ServerConfig serverConfig; + + @Override + public void run() { + EventLoopGroup bossGroup = new NioEventLoopGroup(); //bossGroup就是parentGroup,是负责处理TCP/IP连接的 + EventLoopGroup workerGroup = new NioEventLoopGroup(); //workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件 + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 128) //初始化服务端可连接队列,指定了队列的大小128 + .childOption(ChannelOption.SO_KEEPALIVE, false) //保持长连接 + .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作 + @Override + protected void initChannel(SocketChannel sh) throws Exception { + sh.pipeline() + .addLast(new RpcDecoder(CokeRequest.class)) //解码request + .addLast(new RpcEncoder(CokeRequest.class)) //编码response + .addLast(new NettyServerHandlerAdapter()); //使用ServerHandler类来处理接收到的消息 + } + }); + //绑定监听端口,调用sync同步阻塞方法等待绑定操作完 + ChannelFuture future = null; + try { + future = sb.bind(serverConfig.getCokeServerPort()).sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + //成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。 + try { + log.info("netty server start bind port {}",serverConfig.getCokeServerPort()); + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + } + + @PostConstruct + @Override + public synchronized void start() { + super.start(); + } +} diff --git a/connect-server-netty/src/main/java/org/needcoke/rpc/netty/server/NettyServerHandlerAdapter.java b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/server/NettyServerHandlerAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..1571af532b1f5bb34446f3e966613d755507fb6b --- /dev/null +++ b/connect-server-netty/src/main/java/org/needcoke/rpc/netty/server/NettyServerHandlerAdapter.java @@ -0,0 +1,58 @@ +package org.needcoke.rpc.netty.server; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.connect.rpc.link.tracking.common.CommonConstant; +import org.connect.rpc.link.tracking.config.LinkTrackingContextHolder; +import org.connect.rpc.link.tracking.net.LinkTracking; +import org.needcoke.rpc.codec.CokeRequest; +import org.needcoke.rpc.config.ServerConfig; +import org.needcoke.rpc.netty.processor.ReadMessageServerProcessor; +import org.needcoke.rpc.utils.SpringContextUtils; + +public class NettyServerHandlerAdapter extends ChannelInboundHandlerAdapter { + + private final ReadMessageServerProcessor readMessageProcessor; + + public NettyServerHandlerAdapter() { + this.readMessageProcessor = new ReadMessageServerProcessor(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + CokeRequest request = (CokeRequest)msg; + String json = request.getHeader(CommonConstant.COKE_REQUEST_ID_HEADER_ID_NAME); + int mvcPort = SpringContextUtils.getBean(ServerConfig.class).getMvcPort(); + if (StrUtil.isEmpty(json)) { + LinkTracking linkTracking = new LinkTracking(mvcPort); + linkTracking.setIndex(1); + LinkTrackingContextHolder.setLinkTracking(linkTracking); + } else { + LinkTracking linkTracking = JSONUtil.toBean(json, LinkTracking.class); + linkTracking.setIndex(linkTracking.getIndex() + 1); + linkTracking.changeIp(); + linkTracking.setPort(mvcPort); + LinkTrackingContextHolder.setLinkTracking(linkTracking); + } + readMessageProcessor.channelRead(ctx,request); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + super.channelRegistered(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + } + + +} diff --git a/business-b/pom.xml b/connect-server-smart-socket/pom.xml similarity index 72% rename from business-b/pom.xml rename to connect-server-smart-socket/pom.xml index 4339e2479b11856b4ed7334fc5af0f230035844e..e5a428b099ca5a8cea874db082fea3883a3549c8 100644 --- a/business-b/pom.xml +++ b/connect-server-smart-socket/pom.xml @@ -9,17 +9,26 @@ 4.0.0 - business-b + connect-server-smart-socket 8 8 + org.needcoke connect-core abandon + provided + + + + org.smartboot.socket + aio-core + 1.5.17 + provided diff --git a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequestProtocol.java b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/codec/CokeRequestProtocol.java similarity index 92% rename from connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequestProtocol.java rename to connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/codec/CokeRequestProtocol.java index a8d0935f898920c106f691c79b458cbbd387d91a..090bede08f32d757b730bfae46c3e612c7d091c6 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/codec/CokeRequestProtocol.java +++ b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/codec/CokeRequestProtocol.java @@ -1,6 +1,7 @@ -package org.needcoke.rpc.codec; +package org.needcoke.rpc.smartsocket.codec; import com.alibaba.fastjson.JSONObject; +import org.needcoke.rpc.codec.CokeRequest; import org.smartboot.socket.Protocol; import org.smartboot.socket.transport.AioSession; diff --git a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/invoker/SmartSocketInvoker.java similarity index 47% rename from connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java rename to connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/invoker/SmartSocketInvoker.java index f4a4adf8e6e811d0be77e6bbdb731da4c3c5e295..d8a67c2a116d07afefb76370075cc7929d2ccf75 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/invoker/SmartSocketInvoker.java +++ b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/invoker/SmartSocketInvoker.java @@ -1,11 +1,21 @@ -package org.needcoke.rpc.invoker; +package org.needcoke.rpc.smartsocket.invoker; import cn.hutool.core.date.DateUtil; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.connect.rpc.link.tracking.config.LinkTrackingContextHolder; +import org.connect.rpc.link.tracking.util.TrackingUtil; +import org.needcoke.rpc.Fuse; import org.needcoke.rpc.codec.CokeRequest; -import org.needcoke.rpc.codec.CokeRequestProtocol; +import org.needcoke.rpc.smartsocket.codec.CokeRequestProtocol; import org.needcoke.rpc.common.constant.ConnectConstant; -import org.needcoke.rpc.processor.smart_socket.SmartSocketClientProcessor; +import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; +import org.needcoke.rpc.common.exception.CokeConnectException; +import org.needcoke.rpc.invoker.ConnectInvoker; +import org.needcoke.rpc.invoker.InvokeResult; +import org.needcoke.rpc.invoker.OkHttpsInvoker; +import org.needcoke.rpc.net.Connector; +import org.needcoke.rpc.smartsocket.processor.SmartSocketClientProcessor; import org.needcoke.rpc.utils.ConnectUtil; import org.smartboot.socket.transport.AioQuickClient; import org.smartboot.socket.transport.AioSession; @@ -17,23 +27,26 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.LockSupport; @Slf4j +@NoArgsConstructor public class SmartSocketInvoker extends ConnectInvoker { /** * 给AioQuickClient加个引用,防止垃圾回收。 - * //TODO AioSession失效时重建连接 */ private final Map clientMap = new ConcurrentHashMap<>(); private final Map sessionMap = new ConcurrentHashMap<>(); @Override - public InvokeResult execute(ServiceInstance instance, String beanName, String methodName, Map params) { + public InvokeResult execute(Connector connector, ServiceInstance instance, String beanName, String methodName, Map params) { + InvokeResult res = runDefaultExecute(connector, instance, beanName, methodName, params); + if(null != res){ + return res; + } String uri = instance.getHost() + ConnectConstant.COLON + instance.getPort(); - Integer serverPort = ConnectUtil.getCokeServerPort(instance); + Integer serverPort = connector.getServerPort(instance); if (0 == serverPort) { - throw new RuntimeException("对方服务未开起server!"); - //TODO 异常统一 + throw new CokeConnectException(ConnectionExceptionEnum.REMOTE_SERVICE_DOES_NOT_OPEN_THE_COKE_SERVICE_PORT); } if (!sessionMap.containsKey(uri)) { AioQuickClient aioQuickClient = new AioQuickClient(instance.getHost(), serverPort, new CokeRequestProtocol(), new SmartSocketClientProcessor()); @@ -42,31 +55,42 @@ public class SmartSocketInvoker extends ConnectInvoker { AioSession session = aioQuickClient.start(); sessionMap.put(uri, session); } catch (IOException e) { - throw new RuntimeException(e); + throw new CokeConnectException(ConnectionExceptionEnum.CONNECTION_WITH_REMOTE_SERVICE_FAILED); } } AioSession session = sessionMap.get(uri); - int requestId = ConnectUtil.requestIdMaker.addAndGet(1); CokeRequest request = new CokeRequest().setBeanName(beanName) .setMethodName(methodName) .setParams(params) - .setRequestId(requestId); + .addHeader(TrackingUtil.headerKey(), TrackingUtil.headerValue()); byte[] bytes = request.toBytes(); + InvokeResult tmp = new InvokeResult(); + ConnectUtil.putRequestMap(tmp); try { session.writeBuffer().writeInt(bytes.length); session.writeBuffer().write(bytes); session.writeBuffer().flush(); } catch (IOException e) { - throw new RuntimeException(e.getMessage()); + //一般是session失效了 + log.error(e.getMessage()); + try { + session = clientMap.get(uri).start(); + } catch (IOException ex) { + if (null == connector.getHttpInvoker()) { + connector.setHttpInvoker(new OkHttpsInvoker()); + } + return connector.compensationExecute(instance,beanName,methodName,params); + } + sessionMap.put(uri,session); + return execute(connector,instance,beanName,methodName,params); } - InvokeResult tmp = new InvokeResult(); - long start = DateUtil.current(); - ConnectUtil.putRequestMap(requestId, tmp); - ConnectUtil.threadMap.put(requestId, Thread.currentThread()); + Fuse fuse = new Fuse(fuseConfig.getFuseTimeOut(), TrackingUtil.getRequestId()); + fuseThreadPool.newTask(fuse); LockSupport.park(); - InvokeResult result = ConnectUtil.getFromRequestMap(requestId); + InvokeResult result = ConnectUtil.getFromRequestMap(TrackingUtil.getRequestId()); + long start = LinkTrackingContextHolder.getLinkTracking().getStartTime(); long end = DateUtil.current(); - log.info("requestId = {} , start = {} , end = {} ,cost = {}", requestId, start, end, end - start); + log.info("requestId = {} , start = {} , end = {} ,cost = {}", TrackingUtil.getRequestId(), start, end, end - start); result.setTime(end - start); return result; } diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/processor/SmartSocketClientProcessor.java similarity index 42% rename from connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java rename to connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/processor/SmartSocketClientProcessor.java index e166d2061c6b89cdc7131386149f074e53ecb7cf..f0c864cd5a953f18ed6bf4f0a56b2a256f46dfc2 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketClientProcessor.java +++ b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/processor/SmartSocketClientProcessor.java @@ -1,26 +1,25 @@ -package org.needcoke.rpc.processor.smart_socket; +package org.needcoke.rpc.smartsocket.processor; import lombok.extern.slf4j.Slf4j; +import org.connect.rpc.link.tracking.util.TrackingUtil; +import org.needcoke.rpc.Fuse; import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.common.enums.ConnectRequestEnum; import org.needcoke.rpc.utils.ConnectUtil; import org.smartboot.socket.transport.AioSession; -import java.util.concurrent.locks.LockSupport; - @Slf4j public class SmartSocketClientProcessor extends SmartSocketMessageProcessor { @Override - public void process(AioSession aioSession, CokeRequest request) { + public void process(AioSession session, CokeRequest request) { if (ConnectRequestEnum.INTERNAL_RESPONSE == request.getRequestType()) { - Integer requestId = request.getRequestId(); - log.info("smart socket client receive back requestId = {} , request json = {}", - requestId,new String(request.toBytes())); - ConnectUtil.putRequestMap(requestId,request.getResult()); - Thread thread = ConnectUtil.threadMap.get(requestId); - LockSupport.unpark(thread); - //TODO 抛出异常 + log.debug("smart socket client receive back linkTracking = {} , request json = {}", + TrackingUtil.linkTrackingJsonStr(), new String(request.toBytes())); + boolean bool = Fuse.unPark(request.getCokeRequestId()); + if(bool) { + ConnectUtil.putRequestMap(request.getCokeRequestId(), request.getResult()); + } } } diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/processor/SmartSocketMessageProcessor.java similarity index 90% rename from connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java rename to connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/processor/SmartSocketMessageProcessor.java index 448aaacaba189a44a9b5dbc25251f41a4e2d54dd..40e33415587af7f92975d891c672c52f33d105c3 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketMessageProcessor.java +++ b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/processor/SmartSocketMessageProcessor.java @@ -1,5 +1,6 @@ -package org.needcoke.rpc.processor.smart_socket; +package org.needcoke.rpc.smartsocket.processor; +import org.connect.rpc.link.tracking.config.LinkTrackingContextHolder; import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.invoker.InvokeResult; import org.smartboot.socket.MessageProcessor; @@ -19,8 +20,8 @@ public abstract class SmartSocketMessageProcessor implements MessageProcessor writer.flush(); } catch (IOException e) { throw new RuntimeException(e); - // TODO 未来需要处理返回失败的场景 } + } public void response(AioSession session, CokeRequest response){ diff --git a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/processor/SmartSocketServerProcessor.java similarity index 50% rename from connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java rename to connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/processor/SmartSocketServerProcessor.java index 3b0c902c0b8afa10bdea456b486678173a3759a6..fae050bdc8f885f9b90184604502bdd8da9feec9 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/processor/smart_socket/SmartSocketServerProcessor.java +++ b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/processor/SmartSocketServerProcessor.java @@ -1,54 +1,78 @@ -package org.needcoke.rpc.processor.smart_socket; +package org.needcoke.rpc.smartsocket.processor; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; +import org.connect.rpc.link.tracking.common.CommonConstant; +import org.connect.rpc.link.tracking.config.LinkTrackingContextHolder; +import org.connect.rpc.link.tracking.net.LinkTracking; +import org.connect.rpc.link.tracking.util.TrackingUtil; import org.needcoke.rpc.codec.CokeRequest; import org.needcoke.rpc.common.constant.ConnectConstant; import org.needcoke.rpc.common.enums.ConnectRequestEnum; import org.needcoke.rpc.common.enums.ConnectionExceptionEnum; import org.needcoke.rpc.common.exception.CokeConnectException; +import org.needcoke.rpc.config.ServerConfig; import org.needcoke.rpc.invoker.InvokeResult; import org.needcoke.rpc.utils.SpringContextUtils; import org.smartboot.socket.StateMachineEnum; import org.smartboot.socket.transport.AioSession; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Collection; import java.util.Map; @Slf4j public class SmartSocketServerProcessor extends SmartSocketMessageProcessor { @Override public void process(AioSession session, CokeRequest request) { - + String json = request.getHeader(CommonConstant.COKE_REQUEST_ID_HEADER_ID_NAME); + int mvcPort = SpringContextUtils.getBean(ServerConfig.class).getMvcPort(); + if (StrUtil.isEmpty(json)) { + LinkTracking linkTracking = new LinkTracking(mvcPort); + linkTracking.setIndex(1); + LinkTrackingContextHolder.setLinkTracking(linkTracking); + } else { + LinkTracking linkTracking = JSONUtil.toBean(json, LinkTracking.class); + linkTracking.setIndex(linkTracking.getIndex() + 1); + linkTracking.changeIp(); + linkTracking.setPort(mvcPort); + LinkTrackingContextHolder.setLinkTracking(linkTracking); + } //TODO 将该段代码抽出成公共的 if (ConnectRequestEnum.INTERNAL_REQUEST == request.getRequestType()) { String beanName = request.getBeanName(); String methodName = request.getMethodName(); Map params = request.getParams(); - log.info("execute smart socket requestId = {} , -- beanName : {} , methodName : {} , param : {}", request.getRequestId(), beanName, methodName, JSONObject.toJSONString(params)); + log.info("execute smart socket linkTracking = {} , -- beanName : {} , methodName : {} , param : {}", + TrackingUtil.linkTrackingJsonStr(), beanName, methodName, JSONObject.toJSONString(params)); Method method = SpringContextUtils.getMethod(beanName, methodName); if (null == method) { - log.error(ConnectionExceptionEnum.BEAN_WITHOUT_METHOD.logStatement("beanName {} , methodName {}"), beanName, methodName); + log.error(ConnectionExceptionEnum.BEAN_WITHOUT_METHOD.logStatement("beanName {} , methodName {} , linkTracking = {}"), beanName, methodName, TrackingUtil.linkTrackingJsonStr()); throw new CokeConnectException(ConnectionExceptionEnum.BEAN_WITHOUT_METHOD); } Object bean = SpringContextUtils.getBean(beanName); try { Object invoke = null; - if(CollUtil.isEmpty(params)){ - invoke= method.invoke(bean); - }else { - invoke = method.invoke(bean,params.values().toArray()); + if (CollUtil.isEmpty(params)) { + invoke = method.invoke(bean); + } else { + invoke = method.invoke(bean, params.values().toArray()); } InvokeResult invokeResult = new InvokeResult().setBody(invoke).setStatus(200).setTime(30L); - this.response(session, request.setRequestType(ConnectRequestEnum.INTERNAL_RESPONSE).setResult(invokeResult)); + this.response(session, request.setRequestType(ConnectRequestEnum.INTERNAL_RESPONSE) + .setResult(invokeResult) + .setCokeRequestId(TrackingUtil.getRequestId())); } catch (Exception e) { log.error(ConnectionExceptionEnum.INVOKE_METHOD_ERROR.logStatement(ConnectConstant.EXECUTE_RELATIVE_PATH)); - throw new CokeConnectException(ConnectionExceptionEnum.INVOKE_METHOD_ERROR); + if(e instanceof InvocationTargetException){ + ((InvocationTargetException) e).getTargetException().printStackTrace(); + } + throw new CokeConnectException(ConnectionExceptionEnum.INVOKE_METHOD_ERROR, e); } } - - } @Override diff --git a/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/server/SmartSocketServer.java similarity index 69% rename from connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java rename to connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/server/SmartSocketServer.java index 319b4ca490bdc3f6cd386db8aac9a0ad8bd9a9e5..bea1fd7781e434ca0cd25d31326e1106a0104b75 100644 --- a/connect-core/src/main/java/org/needcoke/rpc/server/SmartSocketServer.java +++ b/connect-server-smart-socket/src/main/java/org/needcoke/rpc/smartsocket/server/SmartSocketServer.java @@ -1,21 +1,19 @@ -package org.needcoke.rpc.server; +package org.needcoke.rpc.smartsocket.server; import lombok.extern.slf4j.Slf4j; -import org.needcoke.rpc.codec.CokeRequestProtocol; import org.needcoke.rpc.config.ServerConfig; -import org.needcoke.rpc.processor.smart_socket.SmartSocketServerProcessor; +import org.needcoke.rpc.smartsocket.processor.SmartSocketServerProcessor; +import org.needcoke.rpc.server.ConnectionServer; +import org.needcoke.rpc.smartsocket.codec.CokeRequestProtocol; import org.smartboot.socket.transport.AioQuickServer; -import org.springframework.cloud.client.ServiceInstance; -import org.springframework.cloud.client.discovery.DiscoveryClient; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.io.IOException; -import java.util.List; @Slf4j -public class SmartSocketServer implements ConnectionServer{ +public class SmartSocketServer implements ConnectionServer { @Resource private ServerConfig serverConfig; diff --git a/pom.xml b/pom.xml index 4747cef872d28150902b5f694709d2685c9b73e2..9930c0f33f9970710a3a029d628571519520362d 100644 --- a/pom.xml +++ b/pom.xml @@ -11,16 +11,19 @@ abandon这个单词是词汇表的第一单词,coke-connect用于作为初始版本 + + connect-fuse + connect-link-tracking connect-core - register-nacos - register-eureka - bussiness-a - business-b - business-c + connect-server-netty + connect-server-smart-socket + spring-cloud-starter-needcoke-coke-connect + 8 + 8 1.18.20 4.1.70.Final 2.15.0 @@ -28,7 +31,4 @@ Hoxton.SR12 5.7.16 - - - \ No newline at end of file diff --git a/register-nacos/src/main/java/Test.java b/register-nacos/src/main/java/Test.java deleted file mode 100644 index b033f91960522ad470f49de2bed62315037946c8..0000000000000000000000000000000000000000 --- a/register-nacos/src/main/java/Test.java +++ /dev/null @@ -1,11 +0,0 @@ -/** - * @author Gilgamesh - * @date 2022/4/2 - */ - -public class Test { - - public static void main(String[] args) { - - } -} diff --git a/business-c/pom.xml b/spring-cloud-starter-needcoke-coke-connect/pom.xml similarity index 45% rename from business-c/pom.xml rename to spring-cloud-starter-needcoke-coke-connect/pom.xml index 4cb91392a5985d88a975cdaf452eb2f38024f3a0..c766d10d824f77c88193f8eb7ca04bd1ad5592fb 100644 --- a/business-c/pom.xml +++ b/spring-cloud-starter-needcoke-coke-connect/pom.xml @@ -9,18 +9,30 @@ 4.0.0 - business-c + spring-cloud-starter-needcoke-coke-connect - 11 - 11 + 8 + 8 - cn.hutool - hutool-all - 5.7.16 + org.needcoke + connect-core + abandon + + + + org.needcoke + connect-server-smart-socket + abandon + + + + org.needcoke + connect-server-netty + abandon