diff --git a/sample/pom.xml b/sample/pom.xml index ecd620828d27cfde689f64776bd6da73e647d7be..128c9690befe2ef4f6909e7f50b0fe8832440d12 100644 --- a/sample/pom.xml +++ b/sample/pom.xml @@ -35,10 +35,10 @@ spring-boot-starter-web - - org.springframework.boot - spring-boot-starter-data-redis - + + + + org.springframework.boot diff --git a/sample/src/main/java/com/jd/platform/sample/Cache.java b/sample/src/main/java/com/jd/platform/sample/Cache.java index cfe48e28649097cfa658f9fee9693a832c00aba4..9ae2c29e183dbc038289fce1b00c32a45cbf0c08 100644 --- a/sample/src/main/java/com/jd/platform/sample/Cache.java +++ b/sample/src/main/java/com/jd/platform/sample/Cache.java @@ -1,119 +1,41 @@ package com.jd.platform.sample; -import com.jd.platform.hotkey.client.callback.JdHotKeyStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import javax.annotation.Resource; -import java.util.concurrent.CompletableFuture; - /** * @author wuweifeng wrote on 2020-02-21 * @version 1.0 */ @Component public class Cache { - @Resource - private RedisTemplate redisTemplate; - - - public String getFromRedis(String key) { - return redisTemplate.opsForValue().get(key); - } - - //最佳实践: - // - //1 判断用户是否是刷子 - // - // if (JdHotKeyStore.isHotKey(“pin__” + thePin)) { - // //限流他,do your job - // } - //2 判断商品id是否是热点 - // - // - // - // Object skuInfo = JdHotKeyStore.getValue("skuId__" + skuId); - // if(skuInfo == null) { - // - // JdHotKeyStore.smartSet("skuId__" + skuId, theSkuInfo); - // } else { - // - // //使用缓存好的value即可 - // - // } - // - // 或者这样: - // - // - // - // if (JdHotKeyStore.isHotKey(key)) { - // //注意是get,不是getValue。getValue会获取并上报,get是纯粹的本地获取 - // - // Object skuInfo = JdHotKeyStore.get("skuId__" + skuId); - // if(skuInfo == null) { - // - // JdHotKeyStore.smartSet("skuId__" + skuId, theSkuInfo); - // } else { - // - // //使用缓存好的value即可 - // - // } - // - // } - - public String get(String key) { - Object object = JdHotKeyStore.getValue(key); - //如果已经缓存过了 - if (object != null) { - System.out.println("is hot key"); - return object.toString(); - } else { - String value = getFromRedis(key); - JdHotKeyStore.smartSet(key, value); - return value; - } - } - - public void set(String key, String value) { - redisTemplate.opsForValue().set(key, value); - } - - public void remove(String key) { - JdHotKeyStore.remove(key); - //do your job - } - - - private Logger logger = LoggerFactory.getLogger(getClass()); - - -// @PostConstruct - public void test() { - - CompletableFuture.runAsync(() -> { - int i = 0; - while (true) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - logger.info("beat"); -// Object object = JdHotKeyStore.getValue("a"); -// if (object != null) { -// System.err.println("is hot key " + object); -// } else { -// System.err.println("set value"); -// JdHotKeyStore.smartSet("a", "a"); -// } - if (JdHotKeyStore.isHotKey("a")) { - logger.error("isHot"); - } - } - }); - - } +// @Resource +// private RedisTemplate redisTemplate; +// +// +// public String getFromRedis(String key) { +// return redisTemplate.opsForValue().get(key); +// } + + +// public String get(String key) { +// Object object = JdHotKeyStore.getValue(key); +// //如果已经缓存过了 +// if (object != null) { +// System.out.println("is hot key"); +// return object.toString(); +// } else { +// String value = getFromRedis(key); +// JdHotKeyStore.smartSet(key, value); +// return value; +// } +// } +// +// public void set(String key, String value) { +// redisTemplate.opsForValue().set(key, value); +// } +// +// public void remove(String key) { +// JdHotKeyStore.remove(key); +// } } diff --git a/sample/src/main/java/com/jd/platform/sample/Starter.java b/sample/src/main/java/com/jd/platform/sample/Starter.java index 15a4233dccf688aba3ad29bc0525d1887b2e70ac..d63e3a0e1c459b5d5f953120c7d5dc6354f044fb 100644 --- a/sample/src/main/java/com/jd/platform/sample/Starter.java +++ b/sample/src/main/java/com/jd/platform/sample/Starter.java @@ -1,10 +1,14 @@ package com.jd.platform.sample; +import com.ibm.etcd.api.KeyValue; import com.jd.platform.hotkey.client.ClientStarter; +import com.jd.platform.hotkey.common.configcenter.IConfigCenter; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.List; /** * @author wuweifeng wrote on 2020-01-14 @@ -17,11 +21,21 @@ public class Starter { @Value("${spring.application.name}") private String appName; + @Resource + private IConfigCenter iConfigCenter; + + + @PostConstruct public void init() { ClientStarter.Builder builder = new ClientStarter.Builder(); ClientStarter starter = builder.setAppName(appName).setEtcdServer(etcd).build(); starter.startPipeline(); + + List list = iConfigCenter.getPrefix("/jd/workers/sample/host"); + for (KeyValue keyValue : list) { + System.out.println(keyValue.getKey() + keyValue.getValue().toStringUtf8()); + } } } diff --git a/sample/src/main/java/com/jd/platform/sample/config/EtcdConfig.java b/sample/src/main/java/com/jd/platform/sample/config/EtcdConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..91feffee34a0dce13aa90fff6b75e7f6f8af4908 --- /dev/null +++ b/sample/src/main/java/com/jd/platform/sample/config/EtcdConfig.java @@ -0,0 +1,26 @@ +package com.jd.platform.sample.config; + +import com.jd.platform.hotkey.common.configcenter.IConfigCenter; +import com.jd.platform.hotkey.common.configcenter.etcd.JdEtcdBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author wuweifeng + * @version 1.0 + * @date 2020-07-27 + */ +@Configuration +public class EtcdConfig { + + @Value("${etcd.server}") + private String etcd; + + + @Bean + public IConfigCenter client() { + //连接多个时,逗号分隔 + return JdEtcdBuilder.build(etcd); + } +} diff --git a/sample/src/main/java/com/jd/platform/sample/config/LocalCache.java b/sample/src/main/java/com/jd/platform/sample/config/LocalCache.java deleted file mode 100644 index 043a9ea4288790cd9d3425218d2f4b465b069876..0000000000000000000000000000000000000000 --- a/sample/src/main/java/com/jd/platform/sample/config/LocalCache.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.jd.platform.sample.config; - -/** - * @author wuweifeng wrote on 2020-02-21 - * @version 1.0 - */ -public class LocalCache { - - - -} diff --git a/sample/src/main/java/com/jd/platform/sample/config/RedisConfig.java b/sample/src/main/java/com/jd/platform/sample/config/RedisConfig.java index b79a8ca1f8c66d336e64482c25a8851609f9d742..e318e001394c1252e5e71284108aeca3d404a04a 100644 --- a/sample/src/main/java/com/jd/platform/sample/config/RedisConfig.java +++ b/sample/src/main/java/com/jd/platform/sample/config/RedisConfig.java @@ -1,21 +1,18 @@ package com.jd.platform.sample.config; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.core.StringRedisTemplate; /** * @author wuweifeng wrote on 2017/10/27. */ @Configuration public class RedisConfig { - - @Bean(name = {"redisTemplate", "stringRedisTemplate"}) - public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { - StringRedisTemplate redisTemplate = new StringRedisTemplate(); - redisTemplate.setConnectionFactory(factory); - return redisTemplate; - } +// +// @Bean(name = {"redisTemplate", "stringRedisTemplate"}) +// public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { +// StringRedisTemplate redisTemplate = new StringRedisTemplate(); +// redisTemplate.setConnectionFactory(factory); +// return redisTemplate; +// } } diff --git a/sample/src/main/java/com/jd/platform/sample/controller/TestController.java b/sample/src/main/java/com/jd/platform/sample/controller/TestController.java index 230dca00fa518f13c3690af020fb640d80b7bb0c..bf643fb15318e1e8801050ea56e80fdc4d080535 100644 --- a/sample/src/main/java/com/jd/platform/sample/controller/TestController.java +++ b/sample/src/main/java/com/jd/platform/sample/controller/TestController.java @@ -9,8 +9,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.List; /** * @author wuweifeng wrote on 2020-02-21 @@ -28,58 +26,58 @@ public class TestController { /** * 往redis里添加20个key */ - @RequestMapping("addKey") - public Object add(Integer count) { - if (count == null) { - count = 20; - } - for (int i = 0; i < count; i++) { - cache.set("key" + i, "我是一个用来做测试的value:" + i); - } - return "success"; - } +// @RequestMapping("addKey") +// public Object add(Integer count) { +// if (count == null) { +// count = 20; +// } +// for (int i = 0; i < count; i++) { +// cache.set("key" + i, "我是一个用来做测试的value:" + i); +// } +// return "success"; +// } /** * 从redis查询key */ - @RequestMapping("find") - public Object findNormal(Integer count) { - if (count == null) { - count = 20; - } - List values = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - values.add(cache.getFromRedis("key" + i)); - } - return values; - } +// @RequestMapping("find") +// public Object findNormal(Integer count) { +// if (count == null) { +// count = 20; +// } +// List values = new ArrayList<>(count); +// for (int i = 0; i < count; i++) { +// values.add(cache.getFromRedis("key" + i)); +// } +// return values; +// } /** * 使用热key查询,从redis查询key */ - @RequestMapping("findHot") - public Object findWithHotKey(Integer count) { - if (count == null) { - count = 20; - } - List values = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - values.add(cache.get("key" + i)); - } - return values; - } - - - @RequestMapping("hot") - public Object hot(Integer count) { - cache.get("key" + count); - - return 1; - } +// @RequestMapping("findHot") +// public Object findWithHotKey(Integer count) { +// if (count == null) { +// count = 20; +// } +// List values = new ArrayList<>(count); +// for (int i = 0; i < count; i++) { +// values.add(cache.get("key" + i)); +// } +// return values; +// } +// +// +// @RequestMapping("hot") +// public Object hot(Integer count) { +// cache.get("key" + count); +// +// return 1; +// } @RequestMapping("") public Object a() { - if (JdHotKeyStore.isHotKey("a")) { + if (JdHotKeyStore.isHotKey("pin_tianyalei")) { logger.error("isHot"); } else { logger.error("noHot"); diff --git a/sample/src/main/resources/application.yml b/sample/src/main/resources/application.yml index 879608b65dc1460cda960cdf03305c2861448737..41fc686c89203651a719a44d41d278da0f9af4d2 100644 --- a/sample/src/main/resources/application.yml +++ b/sample/src/main/resources/application.yml @@ -1,13 +1,15 @@ #etcd的地址,如有多个用逗号分隔 etcd: - server: ${etcdServer:https://127.0.0.1:2379} +# server: ${etcdServer:https://127.0.0.1:2379} +# server: http://10.170.161.91:2379 + server: http://open-etcd.jd.com:2000 spring: application: name: sample ###############################---redis---############################## - redis: - host: ${REDIS_HOST:127.0.0.1} - port: ${REDIS_PORT:6379} - password: ${REDIS_PASSWORD:} +# redis: +# host: ${REDIS_HOST:127.0.0.1} +# port: ${REDIS_PORT:6379} +# password: ${REDIS_PASSWORD:} server: port: 9999 \ No newline at end of file diff --git a/worker/pom.xml b/worker/pom.xml index 0d64df64d214f019181e772c8222d0ebbcef50be..5f234d9048657fefd24996fcded45ce50697e200 100644 --- a/worker/pom.xml +++ b/worker/pom.xml @@ -19,6 +19,18 @@ + + io.protostuff + protostuff-core + 1.7.2 + + + + io.protostuff + protostuff-runtime + 1.7.2 + + com.jd.platform.hotkey common diff --git a/worker/src/main/java/com/jd/platform/hotkey/worker/netty/server/NodesServerHandler.java b/worker/src/main/java/com/jd/platform/hotkey/worker/netty/server/NodesServerHandler.java index 36cbfafaf2590e35beb9105dad2fefb23eb0348c..64f2a2b5acd7594381127edf73f2e800e31d6301 100755 --- a/worker/src/main/java/com/jd/platform/hotkey/worker/netty/server/NodesServerHandler.java +++ b/worker/src/main/java/com/jd/platform/hotkey/worker/netty/server/NodesServerHandler.java @@ -7,6 +7,8 @@ import com.jd.platform.hotkey.worker.netty.client.IClientChangeListener; import com.jd.platform.hotkey.worker.netty.filter.INettyMsgFilter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -28,6 +30,8 @@ public class NodesServerHandler extends SimpleChannelInboundHandler { */ private List messageFilters = new ArrayList<>(); + private Logger logger = LoggerFactory.getLogger(getClass()); + @Override protected void channelRead0(ChannelHandlerContext ctx, String message) { if (StringUtils.isEmpty(message)) { @@ -42,6 +46,11 @@ public class NodesServerHandler extends SimpleChannelInboundHandler { } } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + logger.error("some thing is error , " + cause.getMessage()); + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); diff --git a/worker/src/main/java/com/jd/platform/hotkey/worker/starters/EtcdStarter.java b/worker/src/main/java/com/jd/platform/hotkey/worker/starters/EtcdStarter.java index 895d6a27a96d4e48d6b35a9028ceb8b3676b718c..9ce9cc9db97f33190de8ecacf0ed7e06a53ec642 100644 --- a/worker/src/main/java/com/jd/platform/hotkey/worker/starters/EtcdStarter.java +++ b/worker/src/main/java/com/jd/platform/hotkey/worker/starters/EtcdStarter.java @@ -309,15 +309,17 @@ public class EtcdStarter { } private String buildKey() { - if (StrUtil.isNotEmpty(localAddress)) { - return localAddress; - } String hostName = IpUtils.getHostName(); return ConfigConstant.workersPath + workerPath + "/" + hostName; } private String buildValue() { - String ip = IpUtils.getIp(); + String ip; + if (StrUtil.isNotEmpty(localAddress)) { + ip = localAddress; + } else { + ip = IpUtils.getIp(); + } return ip + MAO + port; } diff --git a/worker/src/main/java/com/jd/platform/hotkey/worker/tool/ProtostuffUtils.java b/worker/src/main/java/com/jd/platform/hotkey/worker/tool/ProtostuffUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..0cb88c5eb341ef2273074e425e019b32582b0731 --- /dev/null +++ b/worker/src/main/java/com/jd/platform/hotkey/worker/tool/ProtostuffUtils.java @@ -0,0 +1,77 @@ +package com.jd.platform.hotkey.worker.tool; + +import io.protostuff.LinkedBuffer; +import io.protostuff.ProtostuffIOUtil; +import io.protostuff.Schema; +import io.protostuff.runtime.RuntimeSchema; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @author 周志刚 + * @date 2019/6/18 + **/ +public class ProtostuffUtils { + /** + * 避免每次序列化都重新申请Buffer空间 + */ + private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); + /** + * 缓存Schema + */ + private static Map, Schema> schemaCache = new ConcurrentHashMap<>(); + + /** + * 序列化方法,把指定对象序列化成字节数组 + * + * @param obj + * @param + * @return + */ + @SuppressWarnings("unchecked") + public static byte[] serialize(T obj) { + Class clazz = (Class) obj.getClass(); + Schema schema = getSchema(clazz); + byte[] data; + try { + data = ProtostuffIOUtil.toByteArray(obj, schema, buffer); + } finally { + buffer.clear(); + } + + return data; + } + + /** + * 反序列化方法,将字节数组反序列化成指定Class类型 + * + * @param data + * @param clazz + * @param + * @return + */ + public static T deserialize(byte[] data, Class clazz) { + Schema schema = getSchema(clazz); + T obj = schema.newMessage(); + ProtostuffIOUtil.mergeFrom(data, obj, schema); + return obj; + } + + @SuppressWarnings("unchecked") + private static Schema getSchema(Class clazz) { + Schema schema = (Schema) schemaCache.get(clazz); + if (Objects.isNull(schema)) { + //这个schema通过RuntimeSchema进行懒创建并缓存 + //所以可以一直调用RuntimeSchema.getSchema(),这个方法是线程安全的 + schema = RuntimeSchema.getSchema(clazz); + if (Objects.nonNull(schema)) { + schemaCache.put(clazz, schema); + } + } + + return schema; + } +} \ No newline at end of file diff --git a/worker/src/test/java/Test.java b/worker/src/test/java/Test.java new file mode 100644 index 0000000000000000000000000000000000000000..92a993c18e863ce59b68cb3d19046c0406cf497e --- /dev/null +++ b/worker/src/test/java/Test.java @@ -0,0 +1,38 @@ +import com.jd.platform.hotkey.common.model.HotKeyModel; +import com.jd.platform.hotkey.common.model.HotKeyMsg; +import com.jd.platform.hotkey.common.tool.FastJsonUtils; +import com.jd.platform.hotkey.worker.tool.ProtostuffUtils; + +/** + * @author wuweifeng + * @version 1.0 + * @date 2020-07-28 + */ +public class Test { + public static void main(String[] args) { + HotKeyMsg hotKeyMsg = new HotKeyMsg(); + hotKeyMsg.setAppName("cartsoa"); + HotKeyModel hotKeyModel = new HotKeyModel(); + hotKeyModel.setCount(1); + hotKeyModel.setKey("pin_xx"); + hotKeyModel.setAppName("cartsoa"); + hotKeyMsg.setBody(FastJsonUtils.convertObjectToJSON(hotKeyModel)); + + byte[] serialize = ProtostuffUtils.serialize(hotKeyMsg); + String msg = FastJsonUtils.convertObjectToJSON(hotKeyMsg); + + long time1 = System.currentTimeMillis(); + for (int i = 0; i < 300000; i++) { + HotKeyMsg hhh = ProtostuffUtils.deserialize(serialize, HotKeyMsg.class); + } + System.out.println(System.currentTimeMillis() - time1); + + long time = System.currentTimeMillis(); + for (int i = 0; i < 300000; i++) { + HotKeyMsg hhh = FastJsonUtils.toBean(msg, HotKeyMsg.class); + } + System.out.println(System.currentTimeMillis() - time); + + + } +}