diff --git a/nredis-proxy-commons/src/main/java/com/opensource/netty/redis/proxy/commons/constants/RedisConstants.java b/nredis-proxy-commons/src/main/java/com/opensource/netty/redis/proxy/commons/constants/RedisConstants.java index 507cc963faf5a7d04c41082e5a8d4fe39c4a7911..e55dbcc9f4b698ea46d6baf599fa631f225b4d8b 100644 --- a/nredis-proxy-commons/src/main/java/com/opensource/netty/redis/proxy/commons/constants/RedisConstants.java +++ b/nredis-proxy-commons/src/main/java/com/opensource/netty/redis/proxy/commons/constants/RedisConstants.java @@ -45,6 +45,10 @@ public class RedisConstants { public static final String INFO="info"; + public static final String PING="ping"; + + public static final String PONG="pong"; + public static final String ZOOKEEPER_REGISTRY_NAMESPACE = "/nredis-proxy"; @@ -111,6 +115,8 @@ public class RedisConstants { public static final char DOLLAR_BYTE = '$'; public static final char ASTERISK_BYTE = '*'; + + public static final char PING_BYTE = 'P'; public static final char COLON_BYTE = ':'; public static final char OK_BYTE = '+'; public static final char ERROR_BYTE = '-'; diff --git a/nredis-proxy-core/src/.DS_Store b/nredis-proxy-core/src/.DS_Store index f82fe7d786559c4faca97dad18564278e4701480..430f602867a4f8a5133171266d8d730a58cd527e 100644 Binary files a/nredis-proxy-core/src/.DS_Store and b/nredis-proxy-core/src/.DS_Store differ diff --git a/nredis-proxy-core/src/main/java/com/opensource/netty/redis/proxy/core/client/impl/AbstractPoolClient.java b/nredis-proxy-core/src/main/java/com/opensource/netty/redis/proxy/core/client/impl/AbstractPoolClient.java index 7b869abe82aa9589df07dfbbfd86c47705af101e..03fa67d33b46e685631675e288d69f9440330120 100644 --- a/nredis-proxy-core/src/main/java/com/opensource/netty/redis/proxy/core/client/impl/AbstractPoolClient.java +++ b/nredis-proxy-core/src/main/java/com/opensource/netty/redis/proxy/core/client/impl/AbstractPoolClient.java @@ -5,12 +5,12 @@ package com.opensource.netty.redis.proxy.core.client.impl; import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import com.opensource.netty.redis.proxy.commons.exception.RedisException; import com.opensource.netty.redis.proxy.core.client.Client; import com.opensource.netty.redis.proxy.core.command.impl.RedisCommand; import com.opensource.netty.redis.proxy.core.connection.IConnection; +import com.opensource.netty.redis.proxy.core.log.impl.LoggerUtils; import com.opensource.netty.redis.proxy.core.pool.utils.LBRedisProxyChannelPoolUtils; import com.opensource.netty.redis.proxy.pool.LBRedisProxyPoolEntry; import com.opensource.netty.redis.proxy.pool.LBRedisProxyPooledObjectFactory; @@ -25,7 +25,6 @@ public abstract class AbstractPoolClient implements Client{ protected LBRedisProxyBasicPool pool; protected LBRedisProxyPoolConfig ffanRedisProxyPoolConfig; protected LBRedisProxyPooledObjectFactory factory; - private Logger logger = LoggerFactory.getLogger(AbstractPoolClient.class); /** * @@ -40,7 +39,7 @@ public abstract class AbstractPoolClient implements Client{ factory = createChannelFactory(); pool = LBRedisProxyChannelPoolUtils.createPool(ffanRedisProxyPoolConfig, factory); }catch(Exception e){ - logger.error("initPool fail,reason:"+e.getCause()+",message:"+e.getMessage(), e); + LoggerUtils.error("initPool fail,reason:"+e.getCause()+",message:"+e.getMessage(), e); } } @@ -59,7 +58,7 @@ public abstract class AbstractPoolClient implements Client{ } String errorMsg = this.getClass().getSimpleName() + " borrowObject Error"; - logger.error(errorMsg); + LoggerUtils.error(errorMsg); throw new RedisException(errorMsg); } @@ -71,7 +70,7 @@ public abstract class AbstractPoolClient implements Client{ try { pool.returnEntry(entry); } catch (Exception ie) { - logger.error(this.getClass().getSimpleName() + " return client Error" , ie); + LoggerUtils.error(this.getClass().getSimpleName() + " return client Error" , ie); } } diff --git a/nredis-proxy-core/src/main/java/com/opensource/netty/redis/proxy/core/protocol/RedisRequestDecoder.java b/nredis-proxy-core/src/main/java/com/opensource/netty/redis/proxy/core/protocol/RedisRequestDecoder.java index 42792b5e1de7bfa8ebdc5803df83705d29d929ea..0ee5ad8fdbec609bce56b9c4ed181bdda1532440 100644 --- a/nredis-proxy-core/src/main/java/com/opensource/netty/redis/proxy/core/protocol/RedisRequestDecoder.java +++ b/nredis-proxy-core/src/main/java/com/opensource/netty/redis/proxy/core/protocol/RedisRequestDecoder.java @@ -54,32 +54,43 @@ public class RedisRequestDecoder extends ReplayingDecoder { requestCommand = new RedisCommand(); checkpoint(RequestState.READ_ARGC); } + }else if(ch==RedisConstants.PING_BYTE){//redis-benchmark + buffer.resetReaderIndex(); + requestCommand = new RedisCommand(); + requestCommand.setArgCount(readInt(buffer)); + List args = new ArrayList<>(requestCommand.getArgCount()); + args.add(RedisConstants.PING.getBytes()); + requestCommand.setArgs(args); + + checkpoint(RequestState.READ_ARGC); }else{ throw new Exception("READ_INIT Unexpected character,ch:"+String.valueOf(ch)); } } case READ_ARGC: { - if(requestCommand!=null){ + if(requestCommand!=null&&requestCommand.getArgCount()==0){ requestCommand.setArgCount(readInt(buffer)); - checkpoint(RequestState.READ_ARG); } + checkpoint(RequestState.READ_ARG); } case READ_ARG: { - List args = new ArrayList<>(requestCommand.getArgCount()); - while (args.size() < requestCommand.getArgCount()) { - char ch = (char) buffer.readByte(); - if (ch == '$') { - int length = readInt(buffer); - byte[] argByte = new byte[length]; - buffer.readBytes(argByte); - buffer.skipBytes(2);//skip \r\n - //LoggerUtils.info("String:"+new String(argByte)); - args.add(argByte); - }else{ - throw new Exception("READ_ARG Unexpected character,ch:"+String.valueOf(ch)); - } - } - requestCommand.setArgs(args); + if(requestCommand.getArgs()==null||requestCommand.getArgs().size()==0){ + List args = new ArrayList<>(requestCommand.getArgCount()); + while (args.size() < requestCommand.getArgCount()) { + char ch = (char) buffer.readByte(); + if (ch == '$') { + int length = readInt(buffer); + byte[] argByte = new byte[length]; + buffer.readBytes(argByte); + buffer.skipBytes(2);//skip \r\n + //LoggerUtils.info("String:"+new String(argByte)); + args.add(argByte); + }else{ + throw new Exception("READ_ARG Unexpected character,ch:"+String.valueOf(ch)); + } + } + requestCommand.setArgs(args); + } checkpoint(RequestState.READ_END); } case READ_END: { @@ -102,11 +113,14 @@ public class RedisRequestDecoder extends ReplayingDecoder { ch = (char) buffer.readByte();//\r读取 } buffer.readByte();//\n读取 - try{ - int result= Integer.parseInt(sb.toString()); + int result= 1; + if(!sb.toString().toLowerCase().equals(RedisConstants.PING)){ + result= Integer.parseInt(sb.toString()); + } return result; }catch(Exception e){//网络闭包引起 + throw new Exception("readInt Unexpected character,result:"+sb.toString()+",ch:"+String.valueOf(ch)); } } @@ -114,7 +128,7 @@ public class RedisRequestDecoder extends ReplayingDecoder { private void skipChar(ByteBuf buffer) { for (;;) { char ch = (char) buffer.readByte(); - if (ch == RedisConstants.ASTERISK_BYTE) { + if (ch == RedisConstants.ASTERISK_BYTE||ch==RedisConstants.PING_BYTE) { buffer.readerIndex(buffer.readerIndex() - 1); break; } diff --git a/nredis-proxy-net/src/main/java/com/opensource/netty/redis/proxy/net/client/LBRedisConnection.java b/nredis-proxy-net/src/main/java/com/opensource/netty/redis/proxy/net/client/LBRedisConnection.java index e13b594db8a6f987cb56837732c2069bc6560958..cfe2cda65dcefa4e1ef5b14a178af7078741ad5c 100644 --- a/nredis-proxy-net/src/main/java/com/opensource/netty/redis/proxy/net/client/LBRedisConnection.java +++ b/nredis-proxy-net/src/main/java/com/opensource/netty/redis/proxy/net/client/LBRedisConnection.java @@ -94,13 +94,8 @@ public class LBRedisConnection implements IConnection{ open(); } this.frontCtx=frontCtx; - backChannel.eventLoop().execute(new Runnable() { - - @Override - public void run() { - backChannel.writeAndFlush(request, backChannel.voidPromise()); - } - }); + backChannel.writeAndFlush(request); + } @Override @@ -160,6 +155,7 @@ public class LBRedisConnection implements IConnection{ frontCtx.close(); } state = ChannelState.CLOSE; + } } catch (Exception e) { diff --git a/nredis-proxy-net/src/main/java/com/opensource/netty/redis/proxy/net/server/support/LBRedisServerHandler.java b/nredis-proxy-net/src/main/java/com/opensource/netty/redis/proxy/net/server/support/LBRedisServerHandler.java index 8a6d73da162f0e0e8680718684ff362f9e7bdc82..362588816f56452133edb0c0ba1f2e64dcff83f5 100644 --- a/nredis-proxy-net/src/main/java/com/opensource/netty/redis/proxy/net/server/support/LBRedisServerHandler.java +++ b/nredis-proxy-net/src/main/java/com/opensource/netty/redis/proxy/net/server/support/LBRedisServerHandler.java @@ -21,7 +21,10 @@ import com.opensource.netty.redis.proxy.core.config.LBRedisServerMasterCluster; import com.opensource.netty.redis.proxy.core.config.support.LBRedisServerBean; import com.opensource.netty.redis.proxy.core.config.support.LBRedisServerClusterBean; import com.opensource.netty.redis.proxy.core.enums.RedisCommandEnums; +import com.opensource.netty.redis.proxy.core.reply.IRedisReply; import com.opensource.netty.redis.proxy.core.reply.impl.ErrorRedisReply; +import com.opensource.netty.redis.proxy.core.reply.impl.StatusRedisReply; + import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -58,7 +61,15 @@ public class LBRedisServerHandler extends SimpleChannelInboundHandler1&&!command.equals(RedisConstants.KEYS)){//第一个是命令,第二个是key + if(command.equals("INCR")){ + System.out.println("command:"+command); + } + System.out.println("command:"+command); + if(request!=null&&command.toLowerCase().equals(RedisConstants.PING)){ + IRedisReply redisReply=new StatusRedisReply(); + ((StatusRedisReply) redisReply).setValue(RedisConstants.PONG.getBytes()); + ctx.writeAndFlush(redisReply); + }else if(request!=null&&request.getArgs().size()>1&&!command.equals(RedisConstants.KEYS)){//第一个是命令,第二个是key RedisCommandEnums commandEnums=getRedisCommandEnums(command); @@ -166,13 +177,18 @@ public class LBRedisServerHandler extends SimpleChannelInboundHandler org.slf4j slf4j-api - 1.5.8 + 1.7.2