diff --git a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java index 2fb4a9ed3e2d9fae5484ebbd25cf36ef1bad19ac..2299a345621a2c67ec68ce1d243e16171fec9d8e 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java +++ b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpBootstrap.java @@ -100,6 +100,11 @@ public class UdpBootstrap { * @param port 指定绑定端口号,为0则随机指定 */ public UdpChannel open(String host, int port) throws IOException { + if (host != null) { + config.setHost(host); + } + config.setPort(port); + if (selector == null) { synchronized (this) { if (selector == null) { @@ -111,7 +116,13 @@ public class UdpBootstrap { DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(false); if (port > 0) { - channel.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port)); + InetSocketAddress inetSocketAddress = host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port); + channel.socket().bind(inetSocketAddress); + if (host == null) { + config.setHost(inetSocketAddress.getHostString()); + } + } else { + config.setHost(""); } if (status == Status.STATUS_RUNNING) { @@ -125,6 +136,9 @@ public class UdpBootstrap { if (status == Status.STATUS_INIT) { initThreadServer(); } + + System.out.println("smart-socket server started on port " + config.getPort() + ",threadNum:" + config.getThreadNum()); + System.out.println("smart-socket server config is " + config); return udpChannel; } @@ -132,6 +146,12 @@ public class UdpBootstrap { if (status != Status.STATUS_INIT) { return; } + + // 增加广告说明 + if (config.isBannerEnabled()) { + System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket ::\t(" + IoServerConfig.VERSION + ")"); + } + this.status = Status.STATUS_RUNNING; int uid = UdpBootstrap.UID++; @@ -203,7 +223,7 @@ public class UdpBootstrap { UdpAioSession aioSession = channel.createAndCacheSession(remote); NetMonitor netMonitor = config.getMonitor(); - if(netMonitor != null){ + if (netMonitor != null) { netMonitor.beforeRead(aioSession); netMonitor.afterRead(aioSession, buffer.remaining()); } @@ -263,6 +283,18 @@ public class UdpBootstrap { return this; } + + /** + * 是否启用控制台Banner打印 + * + * @param bannerEnabled true:启用,false:禁用 + * @return 当前AioQuickServer对象 + */ + public final UdpBootstrap setBannerEnabled(boolean bannerEnabled) { + config.setBannerEnabled(bannerEnabled); + return this; + } + enum Status { /** * 状态:初始 diff --git a/example/pom.xml b/example/pom.xml index dc9703889097b768468beacb3e8a3dc367060f3c..2f0666e64e6409d96b1f68038ddfb29c32796975 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -7,6 +7,7 @@ 1.0.0 rpc + udp benchmark spring c1000k @@ -39,6 +40,11 @@ aio-core 1.5.1 + + org.smartboot.socket + aio-core + 1.5.1 + org.apache.commons commons-lang3 diff --git a/example/simple_server/pom.xml b/example/simple_server/pom.xml index 14de43d3b4056a6bb5a78bc114dcb4fe5c1bd3dd..d44c1785cc0221b8d62d4e90fa85dce64751c2eb 100644 --- a/example/simple_server/pom.xml +++ b/example/simple_server/pom.xml @@ -24,7 +24,7 @@ org.smartboot.socket aio-core - 1.3.23 + 1.5.1 com.alibaba diff --git a/example/udp/pom.xml b/example/udp/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..fdd77a2f1ba3b902e429323544e0e6f21e6813bc --- /dev/null +++ b/example/udp/pom.xml @@ -0,0 +1,41 @@ + + 4.0.0 + + + example + org.smartboot.socket + 1.0.0 + + + udp + + demo + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 3.8.1 + test + + + org.smartboot.socket + aio-core + + + org.smartboot.socket + aio-pro + + + com.alibaba + fastjson + 1.2.54 + + + diff --git a/example/udp/src/main/java/com/gaoyiping/demo/App.java b/example/udp/src/main/java/com/gaoyiping/demo/App.java new file mode 100644 index 0000000000000000000000000000000000000000..fec24ac1b985a1bdfa0b0408d1fc7719ce74a98a --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/App.java @@ -0,0 +1,20 @@ +package com.gaoyiping.demo; + +import java.io.IOException; + +import org.smartboot.socket.transport.AioQuickServer; +import org.smartboot.socket.transport.UdpBootstrap; + +public class App { + public static void main( String[] args ) throws IOException { + int socketPort = 10086; +// AioQuickServer server = new AioQuickServer(socketPort, new DemoProtocol(), new DemoService()); + UdpBootstrap server = new UdpBootstrap<>(new DemoProtocol(), new DemoService()); + server.setReadBufferSize(1024); + server.open(socketPort); + +// server.setBannerEnabled(false); // 关掉万恶的宣传广告 (笑~~) + //server.setThreadNum(100); +// server.start(); + } +} diff --git a/example/udp/src/main/java/com/gaoyiping/demo/AppClient.java b/example/udp/src/main/java/com/gaoyiping/demo/AppClient.java new file mode 100644 index 0000000000000000000000000000000000000000..198afddd717bad29f1fc10828c09fc5abb7eba6f --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/AppClient.java @@ -0,0 +1,49 @@ +package com.gaoyiping.demo; + +import org.smartboot.socket.transport.AioSession; +import org.smartboot.socket.transport.UdpBootstrap; +import org.smartboot.socket.transport.UdpChannel; +import org.smartboot.socket.transport.WriteBuffer; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class AppClient { + public static void main( String[] args ) throws IOException { + + final UdpBootstrap server = new UdpBootstrap(new DemoProtocol(), new DemoClient()); + + int socketPort = 10086; +// AioQuickServer server = new AioQuickServer(socketPort, new DemoProtocol(), new DemoService()); +// UdpBootstrap server = new UdpBootstrap<>(new DemoProtocol(), new DemoService()); + server.setReadBufferSize(1024); +// server.open(socketPort); + + int i = 1; + final SocketAddress remote = new InetSocketAddress("localhost", socketPort); + while (i-- > 0) { + new Thread(() -> { + try { + int count = 100; + UdpChannel channel = server.open(); + AioSession aioSession = channel.connect(remote); + WriteBuffer writeBuffer = aioSession.writeBuffer(); + while (count-- > 0) { + byte[] msg = ("HelloWorld" + count).getBytes(); + writeBuffer.writeInt(msg.length); + writeBuffer.write(msg); + writeBuffer.flush(); + } + System.out.println("发送完毕"); + } catch (Exception e) { + e.printStackTrace(); + } + }).start(); + } + +// server.setBannerEnabled(false); // 关掉万恶的宣传广告 (笑~~) + //server.setThreadNum(100); +// server.start(); + } +} diff --git a/example/udp/src/main/java/com/gaoyiping/demo/DemoClient.java b/example/udp/src/main/java/com/gaoyiping/demo/DemoClient.java new file mode 100644 index 0000000000000000000000000000000000000000..e600b35b5483cb56105b440ed3956d24a32db68a --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/DemoClient.java @@ -0,0 +1,62 @@ +package com.gaoyiping.demo; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.smartboot.socket.MessageProcessor; +import org.smartboot.socket.StateMachineEnum; +import org.smartboot.socket.transport.AioSession; + +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class DemoClient implements MessageProcessor { +// private HashMap clients = new HashMap(); +// private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(12); + + public void process(AioSession session, byte[] msg) { +// JSONObject jsonObject = JSON.parseObject(msg, JSONObject.class); + System.out.println(new String(msg)); + } + + public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) { + // when connection state changed. + switch (stateMachineEnum) { + case NEW_SESSION: + System.out.println("StateMachineEnum.NEW_SESSION"); + break; + case INPUT_SHUTDOWN: + System.out.println("StateMachineEnum.INPUT_SHUTDOWN"); + break; + case PROCESS_EXCEPTION: + System.out.println("StateMachineEnum.PROCESS_EXCEPTION"); + break; + case DECODE_EXCEPTION: + System.out.println("StateMachineEnum.DECODE_EXCEPTION"); + break; + case INPUT_EXCEPTION: + System.out.println("StateMachineEnum.INPUT_EXCEPTION"); + break; + case OUTPUT_EXCEPTION: + System.out.println("StateMachineEnum.OUTPUT_EXCEPTION"); + break; + case SESSION_CLOSING: + System.out.println("StateMachineEnum.SESSION_CLOSING"); + break; + case SESSION_CLOSED: + System.out.println("StateMachineEnum.SESSION_CLOSED"); + break; +// case FLOW_LIMIT: +// System.out.println("StateMachineEnum.FLOW_LIMIT"); +// break; +// case RELEASE_FLOW_LIMIT: +// System.out.println("StateMachineEnum.RELEASE_FLOW_LIMIT"); +// break; + default: + System.out.println("StateMachineEnum.default"); + } + } + +} diff --git a/example/udp/src/main/java/com/gaoyiping/demo/DemoProtocol.java b/example/udp/src/main/java/com/gaoyiping/demo/DemoProtocol.java new file mode 100644 index 0000000000000000000000000000000000000000..2324c1ecf9f372b985602efd779712c0e696b6be --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/DemoProtocol.java @@ -0,0 +1,29 @@ +package com.gaoyiping.demo; + +import java.nio.ByteBuffer; + +import org.smartboot.socket.Protocol; +import org.smartboot.socket.transport.AioSession; + +public class DemoProtocol implements Protocol { + + public byte[] decode(ByteBuffer readBuffer, AioSession session) { + if (readBuffer.remaining() > 0) { + byte[] data = new byte[readBuffer.remaining()]; + readBuffer.get(data); + return data; + // type 1,2,3 message, see: + // https://smartboot.gitee.io/docs/smart-socket/second/3-type-one.html + // https://smartboot.gitee.io/docs/smart-socket/second/4-type-two.html + // https://smartboot.gitee.io/docs/smart-socket/second/5-type-three.html + } + return null; + } + + public ByteBuffer encode(byte[] msg, AioSession session) { + ByteBuffer buffer = ByteBuffer.allocate(msg.length); + buffer.put(msg); + buffer.flip(); + return buffer; + } +} diff --git a/example/udp/src/main/java/com/gaoyiping/demo/DemoService.java b/example/udp/src/main/java/com/gaoyiping/demo/DemoService.java new file mode 100644 index 0000000000000000000000000000000000000000..e61e0aaa5881450e21da047146dca4a94d664ffc --- /dev/null +++ b/example/udp/src/main/java/com/gaoyiping/demo/DemoService.java @@ -0,0 +1,90 @@ +package com.gaoyiping.demo; + +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.smartboot.socket.MessageProcessor; +import org.smartboot.socket.StateMachineEnum; +import org.smartboot.socket.transport.AioSession; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; + +public class DemoService implements MessageProcessor, Runnable { + private HashMap clients = new HashMap(); + private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(12); + + public DemoService() { + executorService.scheduleAtFixedRate(this, 2, 2, TimeUnit.SECONDS); + } + + public void run() { + // send data every 2 second... + if (this.clients.isEmpty()) return; + for (AioSession session: this.clients.values()) { + try { +// session.write("Hey! Smart-Socket it's work...".getBytes()); + session.writeBuffer().write("Hey! Smart-Socket it's work...".getBytes()); + session.writeBuffer().flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public void process(AioSession session, byte[] msg) { + //JSONObject jsonObject = JSON.parseObject(msg, JSONObject.class); + System.out.println(new String(msg)); + // SomeCode... + try { + // Response +// session.write("{\"result\": \"OK\"}".getBytes()); + session.writeBuffer().write("{\"result\": \"OK\"}".getBytes()); + session.writeBuffer().flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) { + // when connection state changed. + switch (stateMachineEnum) { + case NEW_SESSION: + System.out.println("StateMachineEnum.NEW_SESSION"); + break; + case INPUT_SHUTDOWN: + System.out.println("StateMachineEnum.INPUT_SHUTDOWN"); + break; + case PROCESS_EXCEPTION: + System.out.println("StateMachineEnum.PROCESS_EXCEPTION"); + break; + case DECODE_EXCEPTION: + System.out.println("StateMachineEnum.DECODE_EXCEPTION"); + break; + case INPUT_EXCEPTION: + System.out.println("StateMachineEnum.INPUT_EXCEPTION"); + break; + case OUTPUT_EXCEPTION: + System.out.println("StateMachineEnum.OUTPUT_EXCEPTION"); + break; + case SESSION_CLOSING: + System.out.println("StateMachineEnum.SESSION_CLOSING"); + break; + case SESSION_CLOSED: + System.out.println("StateMachineEnum.SESSION_CLOSED"); + break; +// case FLOW_LIMIT: +// System.out.println("StateMachineEnum.FLOW_LIMIT"); +// break; +// case RELEASE_FLOW_LIMIT: +// System.out.println("StateMachineEnum.RELEASE_FLOW_LIMIT"); +// break; + default: + System.out.println("StateMachineEnum.default"); + } + } + +} diff --git a/example/udp/src/test/java/com/gaoyiping/demo/AppTest.java b/example/udp/src/test/java/com/gaoyiping/demo/AppTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ceb9838de795c815df33effe5af2ccf017f664f5 --- /dev/null +++ b/example/udp/src/test/java/com/gaoyiping/demo/AppTest.java @@ -0,0 +1,38 @@ +package com.gaoyiping.demo; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +}