From 79120c3a807d92456647c7121b95abed41241082 Mon Sep 17 00:00:00 2001 From: jkgeekJk <578120036@qq.com> Date: Wed, 11 Jul 2018 09:14:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8CompletableFuture=E7=9A=84Map?= =?UTF-8?q?=E6=9B=BF=E4=BB=A3=E5=85=AC=E7=94=A8map=EF=BC=8Cprovider?= =?UTF-8?q?=E9=87=87=E7=94=A8=E5=A4=9A=E7=BA=BF=E7=A8=8B=E5=A4=84=E7=90=86?= =?UTF-8?q?=EF=BC=8C=E6=8E=A5=E5=8F=A3=E4=BD=BF=E7=94=A8=E8=80=97=E6=97=B6?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E6=B5=8B=E8=AF=95=EF=BC=8Cjdk=E6=94=B91.8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/rpc/pom.xml | 12 ++ .../smartboot/socket/example/Consumer.java | 16 ++- .../socket/example/api/DemoApiImpl.java | 5 + .../example/rpc/RpcConsumerProcessor.java | 67 +++++----- .../example/rpc/RpcProviderProcessor.java | 117 +++++++++--------- 5 files changed, 127 insertions(+), 90 deletions(-) diff --git a/example/rpc/pom.xml b/example/rpc/pom.xml index 828a3c9f..7de7f053 100644 --- a/example/rpc/pom.xml +++ b/example/rpc/pom.xml @@ -10,6 +10,18 @@ 4.0.0 rpc + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + org.smartboot.socket diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java b/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java index 52b735bc..d374edf7 100644 --- a/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java +++ b/example/rpc/src/main/java/org/smartboot/socket/example/Consumer.java @@ -7,6 +7,8 @@ import org.smartboot.socket.transport.AioQuickClient; import java.io.IOException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * @author 三刀 @@ -21,8 +23,18 @@ public class Consumer { consumer.start(); DemoApi demoApi = rpcConsumerProcessor.getObject(DemoApi.class); - System.out.println(demoApi.test("smart-socket")); - System.out.println(demoApi.sum(1, 2)); + ExecutorService pool= Executors.newCachedThreadPool(); + pool.execute(()->{ + System.out.println(demoApi.test("smart-socket")); + }); + pool.execute(()->{ + System.out.println(demoApi.test("smart-socket2")); + }); + pool.execute(()->{ + System.out.println(demoApi.sum(1, 2)); + }); + + } } diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java b/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java index 64562c59..09807ab1 100644 --- a/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java +++ b/example/rpc/src/main/java/org/smartboot/socket/example/api/DemoApiImpl.java @@ -7,6 +7,11 @@ package org.smartboot.socket.example.api; public class DemoApiImpl implements DemoApi { @Override public String test(String name) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } return "hello " + name; } diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java index e7cea365..bf822080 100644 --- a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java +++ b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcConsumerProcessor.java @@ -1,5 +1,6 @@ package org.smartboot.socket.example.rpc; +import com.sun.xml.internal.ws.util.CompletedFuture; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -20,7 +21,10 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.SocketTimeoutException; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; /** * @author 三刀 @@ -28,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class RpcConsumerProcessor implements MessageProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerProcessor.class); - private Map synchRespMap = new ConcurrentHashMap<>(); + private Map> synchRespMap = new ConcurrentHashMap<>(); private Map objectMap = new ConcurrentHashMap<>(); private AioSession aioSession; @@ -39,15 +43,7 @@ public class RpcConsumerProcessor implements MessageProcessor { try { objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); RpcResponse resp = (RpcResponse) objectInput.readObject(); - Object reqMsg = synchRespMap.get(resp.getUuid()); - if (reqMsg != null) { - synchronized (reqMsg) { - if (synchRespMap.containsKey(resp.getUuid())) { - synchRespMap.put(resp.getUuid(), resp); - reqMsg.notifyAll(); - } - } - } + synchRespMap.get(resp.getUuid()).complete(resp); } catch (Exception e) { e.printStackTrace(); } finally { @@ -98,7 +94,8 @@ public class RpcConsumerProcessor implements MessageProcessor { private final RpcResponse sendRpcRequest(RpcRequest request) throws Exception { - synchRespMap.put(request.getUuid(), request); + CompletableFuture rpcResponseCompletableFuture=new CompletableFuture<>(); + synchRespMap.put(request.getUuid(),rpcResponseCompletableFuture); //输出消息 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); @@ -106,28 +103,12 @@ public class RpcConsumerProcessor implements MessageProcessor { objectOutput.writeObject(request); aioSession.write(byteArrayOutputStream.toByteArray()); - if (synchRespMap.containsKey(request.getUuid()) && synchRespMap.get(request.getUuid()) == request) { - synchronized (request) { - if (synchRespMap.containsKey(request.getUuid()) && synchRespMap.get(request.getUuid()) == request) { - try { - request.wait(); - } catch (InterruptedException e) { - LOGGER.warn("", e); - } - } - } - } - Object resp = null; - synchronized (request) { - resp = synchRespMap.remove(request.getUuid()); - } - if (resp == null || resp == request) { - throw new SocketTimeoutException("Message is timeout!" + resp); - } - if (resp instanceof RpcResponse) { - return (RpcResponse) resp; + try { + RpcResponse resp=rpcResponseCompletableFuture.get(3, TimeUnit.SECONDS); + return resp; + } catch (Exception e) { + throw new SocketTimeoutException("Message is timeout!"); } - throw new RuntimeException("invalid response " + resp); } @Override @@ -139,4 +120,26 @@ public class RpcConsumerProcessor implements MessageProcessor { } } + public static void main(String[]args){ + CompletableFuture completableFuture=new CompletableFuture<>(); + new Thread(()->{ + try { + System.out.println(completableFuture.get()); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + }).start(); + + new Thread(()->{ + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + completableFuture.complete(null); + }).start(); + } + } diff --git a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java index 7328eecc..e085dd87 100644 --- a/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java +++ b/example/rpc/src/main/java/org/smartboot/socket/example/rpc/RpcProviderProcessor.java @@ -17,6 +17,8 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * @author 三刀 @@ -25,6 +27,7 @@ import java.util.Map; public class RpcProviderProcessor implements MessageProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderProcessor.class); private Map impMap = new HashMap(); + private ExecutorService pool= Executors.newCachedThreadPool(); /** * 基础数据类型 */ @@ -38,70 +41,72 @@ public class RpcProviderProcessor implements MessageProcessor { @Override public void process(AioSession session, byte[] msg) { - ObjectInput objectInput = null; - ObjectOutput objectOutput = null; - try { - objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); - RpcRequest req = (RpcRequest) objectInput.readObject(); - - RpcResponse resp = new RpcResponse(req.getUuid()); + pool.execute(()->{ + ObjectInput objectInput = null; + ObjectOutput objectOutput = null; try { - String[] paramClassList = req.getParamClassList(); - Object[] paramObjList = req.getParams(); - // 获取入参类型 - Class[] classArray = null; - if (paramClassList != null) { - classArray = new Class[paramClassList.length]; - for (int i = 0; i < classArray.length; i++) { - Class clazz = primitiveClass.get(paramClassList[i]); - if (clazz == null) { - classArray[i] = Class.forName(paramClassList[i]); - } else { - classArray[i] = clazz; + objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); + RpcRequest req = (RpcRequest) objectInput.readObject(); + + RpcResponse resp = new RpcResponse(req.getUuid()); + try { + String[] paramClassList = req.getParamClassList(); + Object[] paramObjList = req.getParams(); + // 获取入参类型 + Class[] classArray = null; + if (paramClassList != null) { + classArray = new Class[paramClassList.length]; + for (int i = 0; i < classArray.length; i++) { + Class clazz = primitiveClass.get(paramClassList[i]); + if (clazz == null) { + classArray[i] = Class.forName(paramClassList[i]); + } else { + classArray[i] = clazz; + } } } + // 调用接口 + Object impObj = impMap.get(req.getInterfaceClass()); + if (impObj == null) { + throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass()); + } + Method method = impObj.getClass().getMethod(req.getMethod(), classArray); + Object obj = method.invoke(impObj, paramObjList); + resp.setReturnObject(obj); + resp.setReturnType(method.getReturnType().getName()); + } catch (InvocationTargetException e) { + LOGGER.error(e.getMessage(), e); + resp.setException(e.getTargetException().getMessage()); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + resp.setException(e.getMessage()); } - // 调用接口 - Object impObj = impMap.get(req.getInterfaceClass()); - if (impObj == null) { - throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass()); - } - Method method = impObj.getClass().getMethod(req.getMethod(), classArray); - Object obj = method.invoke(impObj, paramObjList); - resp.setReturnObject(obj); - resp.setReturnType(method.getReturnType().getName()); - } catch (InvocationTargetException e) { - LOGGER.error(e.getMessage(), e); - resp.setException(e.getTargetException().getMessage()); - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - resp.setException(e.getMessage()); - } - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - objectOutput = new ObjectOutputStream(byteArrayOutputStream); - objectOutput.writeObject(resp); - session.write(byteArrayOutputStream.toByteArray()); - } catch (IOException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } finally { - if (objectInput != null) { - try { - objectInput.close(); - } catch (IOException e) { - e.printStackTrace(); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + objectOutput = new ObjectOutputStream(byteArrayOutputStream); + objectOutput.writeObject(resp); + session.write(byteArrayOutputStream.toByteArray()); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } finally { + if (objectInput != null) { + try { + objectInput.close(); + } catch (IOException e) { + e.printStackTrace(); + } } - } - if (objectOutput != null) { - try { + if (objectOutput != null) { + try { - objectOutput.close(); - } catch (IOException e) { - e.printStackTrace(); + objectOutput.close(); + } catch (IOException e) { + e.printStackTrace(); + } } } - } + }); } -- Gitee