From bca1ef097ee634394d4fe878d4c3114cfdf2a9c9 Mon Sep 17 00:00:00 2001 From: lzj Date: Tue, 10 Nov 2020 22:08:35 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=96=B0=E5=A2=9Eudp=20=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/udp/pom.xml | 41 +++++++++ .../src/main/java/com/gaoyiping/demo/App.java | 20 +++++ .../java/com/gaoyiping/demo/AppClient.java | 49 ++++++++++ .../java/com/gaoyiping/demo/DemoClient.java | 62 +++++++++++++ .../java/com/gaoyiping/demo/DemoProtocol.java | 29 ++++++ .../java/com/gaoyiping/demo/DemoService.java | 90 +++++++++++++++++++ .../test/java/com/gaoyiping/demo/AppTest.java | 38 ++++++++ 7 files changed, 329 insertions(+) create mode 100644 example/udp/pom.xml create mode 100644 example/udp/src/main/java/com/gaoyiping/demo/App.java create mode 100644 example/udp/src/main/java/com/gaoyiping/demo/AppClient.java create mode 100644 example/udp/src/main/java/com/gaoyiping/demo/DemoClient.java create mode 100644 example/udp/src/main/java/com/gaoyiping/demo/DemoProtocol.java create mode 100644 example/udp/src/main/java/com/gaoyiping/demo/DemoService.java create mode 100644 example/udp/src/test/java/com/gaoyiping/demo/AppTest.java diff --git a/example/udp/pom.xml b/example/udp/pom.xml new file mode 100644 index 00000000..fdd77a2f --- /dev/null +++ b/example/udp/pom.xml @@ -0,0 +1,41 @@ + + 4.0.0 + + + example + org.smartboot.socket + 1.0.0 + + + udp + + demo + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 3.8.1 + test + + + org.smartboot.socket + aio-core + + + org.smartboot.socket + aio-pro + + + com.alibaba + fastjson + 1.2.54 + + + diff --git a/example/udp/src/main/java/com/gaoyiping/demo/App.java b/example/udp/src/main/java/com/gaoyiping/demo/App.java new file mode 100644 index 00000000..fec24ac1 --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/App.java @@ -0,0 +1,20 @@ +package com.gaoyiping.demo; + +import java.io.IOException; + +import org.smartboot.socket.transport.AioQuickServer; +import org.smartboot.socket.transport.UdpBootstrap; + +public class App { + public static void main( String[] args ) throws IOException { + int socketPort = 10086; +// AioQuickServer server = new AioQuickServer(socketPort, new DemoProtocol(), new DemoService()); + UdpBootstrap server = new UdpBootstrap<>(new DemoProtocol(), new DemoService()); + server.setReadBufferSize(1024); + server.open(socketPort); + +// server.setBannerEnabled(false); // 关掉万恶的宣传广告 (笑~~) + //server.setThreadNum(100); +// server.start(); + } +} diff --git a/example/udp/src/main/java/com/gaoyiping/demo/AppClient.java b/example/udp/src/main/java/com/gaoyiping/demo/AppClient.java new file mode 100644 index 00000000..198afddd --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/AppClient.java @@ -0,0 +1,49 @@ +package com.gaoyiping.demo; + +import org.smartboot.socket.transport.AioSession; +import org.smartboot.socket.transport.UdpBootstrap; +import org.smartboot.socket.transport.UdpChannel; +import org.smartboot.socket.transport.WriteBuffer; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class AppClient { + public static void main( String[] args ) throws IOException { + + final UdpBootstrap server = new UdpBootstrap(new DemoProtocol(), new DemoClient()); + + int socketPort = 10086; +// AioQuickServer server = new AioQuickServer(socketPort, new DemoProtocol(), new DemoService()); +// UdpBootstrap server = new UdpBootstrap<>(new DemoProtocol(), new DemoService()); + server.setReadBufferSize(1024); +// server.open(socketPort); + + int i = 1; + final SocketAddress remote = new InetSocketAddress("localhost", socketPort); + while (i-- > 0) { + new Thread(() -> { + try { + int count = 100; + UdpChannel channel = server.open(); + AioSession aioSession = channel.connect(remote); + WriteBuffer writeBuffer = aioSession.writeBuffer(); + while (count-- > 0) { + byte[] msg = ("HelloWorld" + count).getBytes(); + writeBuffer.writeInt(msg.length); + writeBuffer.write(msg); + writeBuffer.flush(); + } + System.out.println("发送完毕"); + } catch (Exception e) { + e.printStackTrace(); + } + }).start(); + } + +// server.setBannerEnabled(false); // 关掉万恶的宣传广告 (笑~~) + //server.setThreadNum(100); +// server.start(); + } +} diff --git a/example/udp/src/main/java/com/gaoyiping/demo/DemoClient.java b/example/udp/src/main/java/com/gaoyiping/demo/DemoClient.java new file mode 100644 index 00000000..e600b35b --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/DemoClient.java @@ -0,0 +1,62 @@ +package com.gaoyiping.demo; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.smartboot.socket.MessageProcessor; +import org.smartboot.socket.StateMachineEnum; +import org.smartboot.socket.transport.AioSession; + +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class DemoClient implements MessageProcessor { +// private HashMap clients = new HashMap(); +// private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(12); + + public void process(AioSession session, byte[] msg) { +// JSONObject jsonObject = JSON.parseObject(msg, JSONObject.class); + System.out.println(new String(msg)); + } + + public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) { + // when connection state changed. + switch (stateMachineEnum) { + case NEW_SESSION: + System.out.println("StateMachineEnum.NEW_SESSION"); + break; + case INPUT_SHUTDOWN: + System.out.println("StateMachineEnum.INPUT_SHUTDOWN"); + break; + case PROCESS_EXCEPTION: + System.out.println("StateMachineEnum.PROCESS_EXCEPTION"); + break; + case DECODE_EXCEPTION: + System.out.println("StateMachineEnum.DECODE_EXCEPTION"); + break; + case INPUT_EXCEPTION: + System.out.println("StateMachineEnum.INPUT_EXCEPTION"); + break; + case OUTPUT_EXCEPTION: + System.out.println("StateMachineEnum.OUTPUT_EXCEPTION"); + break; + case SESSION_CLOSING: + System.out.println("StateMachineEnum.SESSION_CLOSING"); + break; + case SESSION_CLOSED: + System.out.println("StateMachineEnum.SESSION_CLOSED"); + break; +// case FLOW_LIMIT: +// System.out.println("StateMachineEnum.FLOW_LIMIT"); +// break; +// case RELEASE_FLOW_LIMIT: +// System.out.println("StateMachineEnum.RELEASE_FLOW_LIMIT"); +// break; + default: + System.out.println("StateMachineEnum.default"); + } + } + +} diff --git a/example/udp/src/main/java/com/gaoyiping/demo/DemoProtocol.java b/example/udp/src/main/java/com/gaoyiping/demo/DemoProtocol.java new file mode 100644 index 00000000..2324c1ec --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/DemoProtocol.java @@ -0,0 +1,29 @@ +package com.gaoyiping.demo; + +import java.nio.ByteBuffer; + +import org.smartboot.socket.Protocol; +import org.smartboot.socket.transport.AioSession; + +public class DemoProtocol implements Protocol { + + public byte[] decode(ByteBuffer readBuffer, AioSession session) { + if (readBuffer.remaining() > 0) { + byte[] data = new byte[readBuffer.remaining()]; + readBuffer.get(data); + return data; + // type 1,2,3 message, see: + // https://smartboot.gitee.io/docs/smart-socket/second/3-type-one.html + // https://smartboot.gitee.io/docs/smart-socket/second/4-type-two.html + // https://smartboot.gitee.io/docs/smart-socket/second/5-type-three.html + } + return null; + } + + public ByteBuffer encode(byte[] msg, AioSession session) { + ByteBuffer buffer = ByteBuffer.allocate(msg.length); + buffer.put(msg); + buffer.flip(); + return buffer; + } +} diff --git a/example/udp/src/main/java/com/gaoyiping/demo/DemoService.java b/example/udp/src/main/java/com/gaoyiping/demo/DemoService.java new file mode 100644 index 00000000..e61e0aaa --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/DemoService.java @@ -0,0 +1,90 @@ +package com.gaoyiping.demo; + +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.smartboot.socket.MessageProcessor; +import org.smartboot.socket.StateMachineEnum; +import org.smartboot.socket.transport.AioSession; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; + +public class DemoService implements MessageProcessor, Runnable { + private HashMap clients = new HashMap(); + private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(12); + + public DemoService() { + executorService.scheduleAtFixedRate(this, 2, 2, TimeUnit.SECONDS); + } + + public void run() { + // send data every 2 second... + if (this.clients.isEmpty()) return; + for (AioSession session: this.clients.values()) { + try { +// session.write("Hey! Smart-Socket it's work...".getBytes()); + session.writeBuffer().write("Hey! Smart-Socket it's work...".getBytes()); + session.writeBuffer().flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public void process(AioSession session, byte[] msg) { + //JSONObject jsonObject = JSON.parseObject(msg, JSONObject.class); + System.out.println(new String(msg)); + // SomeCode... + try { + // Response +// session.write("{\"result\": \"OK\"}".getBytes()); + session.writeBuffer().write("{\"result\": \"OK\"}".getBytes()); + session.writeBuffer().flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) { + // when connection state changed. + switch (stateMachineEnum) { + case NEW_SESSION: + System.out.println("StateMachineEnum.NEW_SESSION"); + break; + case INPUT_SHUTDOWN: + System.out.println("StateMachineEnum.INPUT_SHUTDOWN"); + break; + case PROCESS_EXCEPTION: + System.out.println("StateMachineEnum.PROCESS_EXCEPTION"); + break; + case DECODE_EXCEPTION: + System.out.println("StateMachineEnum.DECODE_EXCEPTION"); + break; + case INPUT_EXCEPTION: + System.out.println("StateMachineEnum.INPUT_EXCEPTION"); + break; + case OUTPUT_EXCEPTION: + System.out.println("StateMachineEnum.OUTPUT_EXCEPTION"); + break; + case SESSION_CLOSING: + System.out.println("StateMachineEnum.SESSION_CLOSING"); + break; + case SESSION_CLOSED: + System.out.println("StateMachineEnum.SESSION_CLOSED"); + break; +// case FLOW_LIMIT: +// System.out.println("StateMachineEnum.FLOW_LIMIT"); +// break; +// case RELEASE_FLOW_LIMIT: +// System.out.println("StateMachineEnum.RELEASE_FLOW_LIMIT"); +// break; + default: + System.out.println("StateMachineEnum.default"); + } + } + +} diff --git a/example/udp/src/test/java/com/gaoyiping/demo/AppTest.java b/example/udp/src/test/java/com/gaoyiping/demo/AppTest.java new file mode 100644 index 00000000..ceb9838d --- /dev/null +++ b/example/udp/src/test/java/com/gaoyiping/demo/AppTest.java @@ -0,0 +1,38 @@ +package com.gaoyiping.demo; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} -- Gitee From 8e247c3219f4e29731797c4101ebaea24c77200d Mon Sep 17 00:00:00 2001 From: lzj Date: Tue, 10 Nov 2020 22:22:07 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20udp=20=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E6=97=B6=E7=9A=84=E6=8F=90=E7=A4=BA=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/transport/UdpBootstrap.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java index 2fb4a9ed..5ad81e56 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java +++ b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java @@ -100,6 +100,16 @@ public class UdpBootstrap { * @param port 指定绑定端口号,为0则随机指定 */ public UdpChannel open(String host, int port) throws IOException { + if(host != null){ + config.setHost(host); + } + config.setPort(port); + + // 增加广告说明 + if (config.isBannerEnabled()) { + System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket ::\t(" + IoServerConfig.VERSION + ")"); + } + if (selector == null) { synchronized (this) { if (selector == null) { @@ -111,7 +121,13 @@ public class UdpBootstrap { DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(false); if (port > 0) { - channel.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port)); + InetSocketAddress inetSocketAddress = host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port); + channel.socket().bind(inetSocketAddress); + if(host == null){ + config.setHost(inetSocketAddress.getHostString()); + } + } else { + config.setHost( ""); } if (status == Status.STATUS_RUNNING) { @@ -125,6 +141,9 @@ public class UdpBootstrap { if (status == Status.STATUS_INIT) { initThreadServer(); } + + System.out.println("smart-socket server started on port " + config.getPort() + ",threadNum:" + config.getThreadNum()); + System.out.println("smart-socket server config is " + config); return udpChannel; } @@ -263,6 +282,18 @@ public class UdpBootstrap { return this; } + + /** + * 是否启用控制台Banner打印 + * + * @param bannerEnabled true:启用,false:禁用 + * @return 当前AioQuickServer对象 + */ + public final UdpBootstrap setBannerEnabled(boolean bannerEnabled) { + config.setBannerEnabled(bannerEnabled); + return this; + } + enum Status { /** * 状态:初始 -- Gitee From 23d4349b8fb5774e69a0747101a4f46ccd7ed55f Mon Sep 17 00:00:00 2001 From: lzj Date: Wed, 11 Nov 2020 09:45:31 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=96=B0=E5=A2=9Eudp=20=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/pom.xml | 6 ++++++ example/simple_server/pom.xml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/example/pom.xml b/example/pom.xml index dc970388..2f0666e6 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -7,6 +7,7 @@ 1.0.0 rpc + udp benchmark spring c1000k @@ -39,6 +40,11 @@ aio-core 1.5.1 + + org.smartboot.socket + aio-core + 1.5.1 + org.apache.commons commons-lang3 diff --git a/example/simple_server/pom.xml b/example/simple_server/pom.xml index 14de43d3..d44c1785 100644 --- a/example/simple_server/pom.xml +++ b/example/simple_server/pom.xml @@ -24,7 +24,7 @@ org.smartboot.socket aio-core - 1.3.23 + 1.5.1 com.alibaba -- Gitee From 1dc18aca0a73542cdf58fe55de7ce566fd2309fb Mon Sep 17 00:00:00 2001 From: lzj Date: Wed, 11 Nov 2020 16:23:55 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=A7=84=E8=8C=83?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/socket/transport/UdpBootstrap.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java index 5ad81e56..4c59772a 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java +++ b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java @@ -100,7 +100,7 @@ public class UdpBootstrap { * @param port 指定绑定端口号,为0则随机指定 */ public UdpChannel open(String host, int port) throws IOException { - if(host != null){ + if (host != null) { config.setHost(host); } config.setPort(port); @@ -123,11 +123,11 @@ public class UdpBootstrap { if (port > 0) { InetSocketAddress inetSocketAddress = host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port); channel.socket().bind(inetSocketAddress); - if(host == null){ + if (host == null) { config.setHost(inetSocketAddress.getHostString()); } } else { - config.setHost( ""); + config.setHost(""); } if (status == Status.STATUS_RUNNING) { @@ -222,7 +222,7 @@ public class UdpBootstrap { UdpAioSession aioSession = channel.createAndCacheSession(remote); NetMonitor netMonitor = config.getMonitor(); - if(netMonitor != null){ + if (netMonitor != null) { netMonitor.beforeRead(aioSession); netMonitor.afterRead(aioSession, buffer.remaining()); } -- Gitee From 357201b0fa1a09bfe42b1a282840e2607ac75ba2 Mon Sep 17 00:00:00 2001 From: lzj Date: Thu, 12 Nov 2020 14:20:25 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=8C=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E5=B9=BF=E5=91=8A=E4=BF=A1=E6=81=AF=EF=BC=8C=E6=AF=8F?= =?UTF-8?q?=E6=AC=A1open=E6=97=B6=E9=83=BD=E4=BC=9A=E6=98=BE=E7=A4=BA?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/socket/transport/UdpBootstrap.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java index 4c59772a..2299a345 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java +++ b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java @@ -105,11 +105,6 @@ public class UdpBootstrap { } config.setPort(port); - // 增加广告说明 - if (config.isBannerEnabled()) { - System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket ::\t(" + IoServerConfig.VERSION + ")"); - } - if (selector == null) { synchronized (this) { if (selector == null) { @@ -151,6 +146,12 @@ public class UdpBootstrap { if (status != Status.STATUS_INIT) { return; } + + // 增加广告说明 + if (config.isBannerEnabled()) { + System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket ::\t(" + IoServerConfig.VERSION + ")"); + } + this.status = Status.STATUS_RUNNING; int uid = UdpBootstrap.UID++; -- Gitee