diff --git a/example/rpc/pom.xml b/example/rpc/pom.xml
index 828a3c9f17ab8429ba80ad1ea5f594967bb320a8..7de7f053cd8f0888c28a12ddbec3a43e2975e3aa 100644
--- a/example/rpc/pom.xml
+++ b/example/rpc/pom.xml
@@ -10,6 +10,18 @@
4.0.0
rpc
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 1.8
+ 1.8
+
+
+
+
org.smartboot.socket
diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java b/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java
index 52b735bc0c9c9726cb4c2efd5ce74eeeb4d05d7f..d374edf77503edcfbea020ad056f45bcff4e00b6 100644
--- a/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java
+++ b/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java
@@ -7,6 +7,8 @@ import org.smartboot.socket.transport.AioQuickClient;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* @author 三刀
@@ -21,8 +23,18 @@ public class Consumer {
consumer.start();
DemoApi demoApi = rpcConsumerProcessor.getObject(DemoApi.class);
- System.out.println(demoApi.test("smart-socket"));
- System.out.println(demoApi.sum(1, 2));
+ ExecutorService pool= Executors.newCachedThreadPool();
+ pool.execute(()->{
+ System.out.println(demoApi.test("smart-socket"));
+ });
+ pool.execute(()->{
+ System.out.println(demoApi.test("smart-socket2"));
+ });
+ pool.execute(()->{
+ System.out.println(demoApi.sum(1, 2));
+ });
+
+
}
}
diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java b/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java
index 64562c594162d93c9511c65de97e1d70aa3dd571..09807ab1697ea5333aad94ad718b94fac5fdfcd2 100644
--- a/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java
+++ b/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java
@@ -7,6 +7,11 @@ package org.smartboot.socket.example.api;
public class DemoApiImpl implements DemoApi {
@Override
public String test(String name) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
return "hello " + name;
}
diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java
index e7cea3653b259e6f4bb4704dfa0898f6297dad49..bf822080375bacac6e1459d9a386eb4bd6376fab 100644
--- a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java
+++ b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java
@@ -1,5 +1,6 @@
package org.smartboot.socket.example.rpc;
+import com.sun.xml.internal.ws.util.CompletedFuture;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -20,7 +21,10 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.SocketTimeoutException;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* @author 三刀
@@ -28,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class RpcConsumerProcessor implements MessageProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerProcessor.class);
- private Map synchRespMap = new ConcurrentHashMap<>();
+ private Map> synchRespMap = new ConcurrentHashMap<>();
private Map objectMap = new ConcurrentHashMap<>();
private AioSession aioSession;
@@ -39,15 +43,7 @@ public class RpcConsumerProcessor implements MessageProcessor {
try {
objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
RpcResponse resp = (RpcResponse) objectInput.readObject();
- Object reqMsg = synchRespMap.get(resp.getUuid());
- if (reqMsg != null) {
- synchronized (reqMsg) {
- if (synchRespMap.containsKey(resp.getUuid())) {
- synchRespMap.put(resp.getUuid(), resp);
- reqMsg.notifyAll();
- }
- }
- }
+ synchRespMap.get(resp.getUuid()).complete(resp);
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -98,7 +94,8 @@ public class RpcConsumerProcessor implements MessageProcessor {
private final RpcResponse sendRpcRequest(RpcRequest request) throws Exception {
- synchRespMap.put(request.getUuid(), request);
+ CompletableFuture rpcResponseCompletableFuture=new CompletableFuture<>();
+ synchRespMap.put(request.getUuid(),rpcResponseCompletableFuture);
//输出消息
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -106,28 +103,12 @@ public class RpcConsumerProcessor implements MessageProcessor {
objectOutput.writeObject(request);
aioSession.write(byteArrayOutputStream.toByteArray());
- if (synchRespMap.containsKey(request.getUuid()) && synchRespMap.get(request.getUuid()) == request) {
- synchronized (request) {
- if (synchRespMap.containsKey(request.getUuid()) && synchRespMap.get(request.getUuid()) == request) {
- try {
- request.wait();
- } catch (InterruptedException e) {
- LOGGER.warn("", e);
- }
- }
- }
- }
- Object resp = null;
- synchronized (request) {
- resp = synchRespMap.remove(request.getUuid());
- }
- if (resp == null || resp == request) {
- throw new SocketTimeoutException("Message is timeout!" + resp);
- }
- if (resp instanceof RpcResponse) {
- return (RpcResponse) resp;
+ try {
+ RpcResponse resp=rpcResponseCompletableFuture.get(3, TimeUnit.SECONDS);
+ return resp;
+ } catch (Exception e) {
+ throw new SocketTimeoutException("Message is timeout!");
}
- throw new RuntimeException("invalid response " + resp);
}
@Override
@@ -139,4 +120,26 @@ public class RpcConsumerProcessor implements MessageProcessor {
}
}
+ public static void main(String[]args){
+ CompletableFuture completableFuture=new CompletableFuture<>();
+ new Thread(()->{
+ try {
+ System.out.println(completableFuture.get());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ new Thread(()->{
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ completableFuture.complete(null);
+ }).start();
+ }
+
}
diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java
index 7328eecc1999e5807b27d8fc81029c6cb163b2e1..e085dd8724a3554f286db9052436a0e738dba04c 100644
--- a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java
+++ b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java
@@ -17,6 +17,8 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* @author 三刀
@@ -25,6 +27,7 @@ import java.util.Map;
public class RpcProviderProcessor implements MessageProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderProcessor.class);
private Map impMap = new HashMap();
+ private ExecutorService pool= Executors.newCachedThreadPool();
/**
* 基础数据类型
*/
@@ -38,70 +41,72 @@ public class RpcProviderProcessor implements MessageProcessor {
@Override
public void process(AioSession session, byte[] msg) {
- ObjectInput objectInput = null;
- ObjectOutput objectOutput = null;
- try {
- objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
- RpcRequest req = (RpcRequest) objectInput.readObject();
-
- RpcResponse resp = new RpcResponse(req.getUuid());
+ pool.execute(()->{
+ ObjectInput objectInput = null;
+ ObjectOutput objectOutput = null;
try {
- String[] paramClassList = req.getParamClassList();
- Object[] paramObjList = req.getParams();
- // 获取入参类型
- Class>[] classArray = null;
- if (paramClassList != null) {
- classArray = new Class[paramClassList.length];
- for (int i = 0; i < classArray.length; i++) {
- Class> clazz = primitiveClass.get(paramClassList[i]);
- if (clazz == null) {
- classArray[i] = Class.forName(paramClassList[i]);
- } else {
- classArray[i] = clazz;
+ objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
+ RpcRequest req = (RpcRequest) objectInput.readObject();
+
+ RpcResponse resp = new RpcResponse(req.getUuid());
+ try {
+ String[] paramClassList = req.getParamClassList();
+ Object[] paramObjList = req.getParams();
+ // 获取入参类型
+ Class>[] classArray = null;
+ if (paramClassList != null) {
+ classArray = new Class[paramClassList.length];
+ for (int i = 0; i < classArray.length; i++) {
+ Class> clazz = primitiveClass.get(paramClassList[i]);
+ if (clazz == null) {
+ classArray[i] = Class.forName(paramClassList[i]);
+ } else {
+ classArray[i] = clazz;
+ }
}
}
+ // 调用接口
+ Object impObj = impMap.get(req.getInterfaceClass());
+ if (impObj == null) {
+ throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass());
+ }
+ Method method = impObj.getClass().getMethod(req.getMethod(), classArray);
+ Object obj = method.invoke(impObj, paramObjList);
+ resp.setReturnObject(obj);
+ resp.setReturnType(method.getReturnType().getName());
+ } catch (InvocationTargetException e) {
+ LOGGER.error(e.getMessage(), e);
+ resp.setException(e.getTargetException().getMessage());
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ resp.setException(e.getMessage());
}
- // 调用接口
- Object impObj = impMap.get(req.getInterfaceClass());
- if (impObj == null) {
- throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass());
- }
- Method method = impObj.getClass().getMethod(req.getMethod(), classArray);
- Object obj = method.invoke(impObj, paramObjList);
- resp.setReturnObject(obj);
- resp.setReturnType(method.getReturnType().getName());
- } catch (InvocationTargetException e) {
- LOGGER.error(e.getMessage(), e);
- resp.setException(e.getTargetException().getMessage());
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- resp.setException(e.getMessage());
- }
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- objectOutput = new ObjectOutputStream(byteArrayOutputStream);
- objectOutput.writeObject(resp);
- session.write(byteArrayOutputStream.toByteArray());
- } catch (IOException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } finally {
- if (objectInput != null) {
- try {
- objectInput.close();
- } catch (IOException e) {
- e.printStackTrace();
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ objectOutput = new ObjectOutputStream(byteArrayOutputStream);
+ objectOutput.writeObject(resp);
+ session.write(byteArrayOutputStream.toByteArray());
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ } finally {
+ if (objectInput != null) {
+ try {
+ objectInput.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
- }
- if (objectOutput != null) {
- try {
+ if (objectOutput != null) {
+ try {
- objectOutput.close();
- } catch (IOException e) {
- e.printStackTrace();
+ objectOutput.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
}
- }
+ });
}