From 79120c3a807d92456647c7121b95abed41241082 Mon Sep 17 00:00:00 2001
From: jkgeekJk <578120036@qq.com>
Date: Wed, 11 Jul 2018 09:14:13 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8CompletableFuture=E7=9A=84Map?=
=?UTF-8?q?=E6=9B=BF=E4=BB=A3=E5=85=AC=E7=94=A8map=EF=BC=8Cprovider?=
=?UTF-8?q?=E9=87=87=E7=94=A8=E5=A4=9A=E7=BA=BF=E7=A8=8B=E5=A4=84=E7=90=86?=
=?UTF-8?q?=EF=BC=8C=E6=8E=A5=E5=8F=A3=E4=BD=BF=E7=94=A8=E8=80=97=E6=97=B6?=
=?UTF-8?q?=E8=BF=9B=E8=A1=8C=E6=B5=8B=E8=AF=95=EF=BC=8Cjdk=E6=94=B91.8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
example/rpc/pom.xml | 12 ++
.../smartboot/socket/example/Consumer.java | 16 ++-
.../socket/example/api/DemoApiImpl.java | 5 +
.../example/rpc/RpcConsumerProcessor.java | 67 +++++-----
.../example/rpc/RpcProviderProcessor.java | 117 +++++++++---------
5 files changed, 127 insertions(+), 90 deletions(-)
diff --git a/example/rpc/pom.xml b/example/rpc/pom.xml
index 828a3c9f..7de7f053 100644
--- a/example/rpc/pom.xml
+++ b/example/rpc/pom.xml
@@ -10,6 +10,18 @@
4.0.0
rpc
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 1.8
+ 1.8
+
+
+
+
org.smartboot.socket
diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java b/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java
index 52b735bc..d374edf7 100644
--- a/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java
+++ b/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java
@@ -7,6 +7,8 @@ import org.smartboot.socket.transport.AioQuickClient;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* @author 三刀
@@ -21,8 +23,18 @@ public class Consumer {
consumer.start();
DemoApi demoApi = rpcConsumerProcessor.getObject(DemoApi.class);
- System.out.println(demoApi.test("smart-socket"));
- System.out.println(demoApi.sum(1, 2));
+ ExecutorService pool= Executors.newCachedThreadPool();
+ pool.execute(()->{
+ System.out.println(demoApi.test("smart-socket"));
+ });
+ pool.execute(()->{
+ System.out.println(demoApi.test("smart-socket2"));
+ });
+ pool.execute(()->{
+ System.out.println(demoApi.sum(1, 2));
+ });
+
+
}
}
diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java b/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java
index 64562c59..09807ab1 100644
--- a/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java
+++ b/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java
@@ -7,6 +7,11 @@ package org.smartboot.socket.example.api;
public class DemoApiImpl implements DemoApi {
@Override
public String test(String name) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
return "hello " + name;
}
diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java
index e7cea365..bf822080 100644
--- a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java
+++ b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java
@@ -1,5 +1,6 @@
package org.smartboot.socket.example.rpc;
+import com.sun.xml.internal.ws.util.CompletedFuture;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -20,7 +21,10 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.SocketTimeoutException;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* @author 三刀
@@ -28,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class RpcConsumerProcessor implements MessageProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerProcessor.class);
- private Map synchRespMap = new ConcurrentHashMap<>();
+ private Map> synchRespMap = new ConcurrentHashMap<>();
private Map objectMap = new ConcurrentHashMap<>();
private AioSession aioSession;
@@ -39,15 +43,7 @@ public class RpcConsumerProcessor implements MessageProcessor {
try {
objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
RpcResponse resp = (RpcResponse) objectInput.readObject();
- Object reqMsg = synchRespMap.get(resp.getUuid());
- if (reqMsg != null) {
- synchronized (reqMsg) {
- if (synchRespMap.containsKey(resp.getUuid())) {
- synchRespMap.put(resp.getUuid(), resp);
- reqMsg.notifyAll();
- }
- }
- }
+ synchRespMap.get(resp.getUuid()).complete(resp);
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -98,7 +94,8 @@ public class RpcConsumerProcessor implements MessageProcessor {
private final RpcResponse sendRpcRequest(RpcRequest request) throws Exception {
- synchRespMap.put(request.getUuid(), request);
+ CompletableFuture rpcResponseCompletableFuture=new CompletableFuture<>();
+ synchRespMap.put(request.getUuid(),rpcResponseCompletableFuture);
//输出消息
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -106,28 +103,12 @@ public class RpcConsumerProcessor implements MessageProcessor {
objectOutput.writeObject(request);
aioSession.write(byteArrayOutputStream.toByteArray());
- if (synchRespMap.containsKey(request.getUuid()) && synchRespMap.get(request.getUuid()) == request) {
- synchronized (request) {
- if (synchRespMap.containsKey(request.getUuid()) && synchRespMap.get(request.getUuid()) == request) {
- try {
- request.wait();
- } catch (InterruptedException e) {
- LOGGER.warn("", e);
- }
- }
- }
- }
- Object resp = null;
- synchronized (request) {
- resp = synchRespMap.remove(request.getUuid());
- }
- if (resp == null || resp == request) {
- throw new SocketTimeoutException("Message is timeout!" + resp);
- }
- if (resp instanceof RpcResponse) {
- return (RpcResponse) resp;
+ try {
+ RpcResponse resp=rpcResponseCompletableFuture.get(3, TimeUnit.SECONDS);
+ return resp;
+ } catch (Exception e) {
+ throw new SocketTimeoutException("Message is timeout!");
}
- throw new RuntimeException("invalid response " + resp);
}
@Override
@@ -139,4 +120,26 @@ public class RpcConsumerProcessor implements MessageProcessor {
}
}
+ public static void main(String[]args){
+ CompletableFuture completableFuture=new CompletableFuture<>();
+ new Thread(()->{
+ try {
+ System.out.println(completableFuture.get());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ new Thread(()->{
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ completableFuture.complete(null);
+ }).start();
+ }
+
}
diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java
index 7328eecc..e085dd87 100644
--- a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java
+++ b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java
@@ -17,6 +17,8 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* @author 三刀
@@ -25,6 +27,7 @@ import java.util.Map;
public class RpcProviderProcessor implements MessageProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderProcessor.class);
private Map impMap = new HashMap();
+ private ExecutorService pool= Executors.newCachedThreadPool();
/**
* 基础数据类型
*/
@@ -38,70 +41,72 @@ public class RpcProviderProcessor implements MessageProcessor {
@Override
public void process(AioSession session, byte[] msg) {
- ObjectInput objectInput = null;
- ObjectOutput objectOutput = null;
- try {
- objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
- RpcRequest req = (RpcRequest) objectInput.readObject();
-
- RpcResponse resp = new RpcResponse(req.getUuid());
+ pool.execute(()->{
+ ObjectInput objectInput = null;
+ ObjectOutput objectOutput = null;
try {
- String[] paramClassList = req.getParamClassList();
- Object[] paramObjList = req.getParams();
- // 获取入参类型
- Class>[] classArray = null;
- if (paramClassList != null) {
- classArray = new Class[paramClassList.length];
- for (int i = 0; i < classArray.length; i++) {
- Class> clazz = primitiveClass.get(paramClassList[i]);
- if (clazz == null) {
- classArray[i] = Class.forName(paramClassList[i]);
- } else {
- classArray[i] = clazz;
+ objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
+ RpcRequest req = (RpcRequest) objectInput.readObject();
+
+ RpcResponse resp = new RpcResponse(req.getUuid());
+ try {
+ String[] paramClassList = req.getParamClassList();
+ Object[] paramObjList = req.getParams();
+ // 获取入参类型
+ Class>[] classArray = null;
+ if (paramClassList != null) {
+ classArray = new Class[paramClassList.length];
+ for (int i = 0; i < classArray.length; i++) {
+ Class> clazz = primitiveClass.get(paramClassList[i]);
+ if (clazz == null) {
+ classArray[i] = Class.forName(paramClassList[i]);
+ } else {
+ classArray[i] = clazz;
+ }
}
}
+ // 调用接口
+ Object impObj = impMap.get(req.getInterfaceClass());
+ if (impObj == null) {
+ throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass());
+ }
+ Method method = impObj.getClass().getMethod(req.getMethod(), classArray);
+ Object obj = method.invoke(impObj, paramObjList);
+ resp.setReturnObject(obj);
+ resp.setReturnType(method.getReturnType().getName());
+ } catch (InvocationTargetException e) {
+ LOGGER.error(e.getMessage(), e);
+ resp.setException(e.getTargetException().getMessage());
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ resp.setException(e.getMessage());
}
- // 调用接口
- Object impObj = impMap.get(req.getInterfaceClass());
- if (impObj == null) {
- throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass());
- }
- Method method = impObj.getClass().getMethod(req.getMethod(), classArray);
- Object obj = method.invoke(impObj, paramObjList);
- resp.setReturnObject(obj);
- resp.setReturnType(method.getReturnType().getName());
- } catch (InvocationTargetException e) {
- LOGGER.error(e.getMessage(), e);
- resp.setException(e.getTargetException().getMessage());
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- resp.setException(e.getMessage());
- }
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- objectOutput = new ObjectOutputStream(byteArrayOutputStream);
- objectOutput.writeObject(resp);
- session.write(byteArrayOutputStream.toByteArray());
- } catch (IOException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } finally {
- if (objectInput != null) {
- try {
- objectInput.close();
- } catch (IOException e) {
- e.printStackTrace();
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ objectOutput = new ObjectOutputStream(byteArrayOutputStream);
+ objectOutput.writeObject(resp);
+ session.write(byteArrayOutputStream.toByteArray());
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ } finally {
+ if (objectInput != null) {
+ try {
+ objectInput.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
- }
- if (objectOutput != null) {
- try {
+ if (objectOutput != null) {
+ try {
- objectOutput.close();
- } catch (IOException e) {
- e.printStackTrace();
+ objectOutput.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
}
- }
+ });
}
--
Gitee