diff --git a/worker/pom.xml b/worker/pom.xml index aec68d8d220ad683bf75cd5c926c3b237d7f31f2..1284540a90223eb2ba23dbdae58edc194083f985 100644 --- a/worker/pom.xml +++ b/worker/pom.xml @@ -19,6 +19,18 @@ + + io.protostuff + protostuff-core + 1.7.2 + + + + io.protostuff + protostuff-runtime + 1.7.2 + + com.jd.platform.hotkey common @@ -26,13 +38,18 @@ compile - org.jctools jctools-core 2.1.2 + + com.jd.ump + jannotation + 4.0.5 + + org.springframework.boot spring-boot-starter-web diff --git a/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/DispatcherConfig.java b/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/DispatcherConfig.java index 5866cf686837a1e4d4182dfe7243e26d84fd2523..510a85401d0f24d121564b132f64b71cda7734e1 100644 --- a/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/DispatcherConfig.java +++ b/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/DispatcherConfig.java @@ -36,6 +36,7 @@ public class DispatcherConfig { */ public static BlockingQueue QUEUE = new LinkedBlockingQueue<>(2000000); public static Map> MAPQUEUE = new ConcurrentHashMap<>(); + public static Map CONSUMERMAP = new ConcurrentHashMap<>(); static { // @@ -60,8 +61,9 @@ public class DispatcherConfig { for (int i = 0; i < nowCount; i++) { KeyConsumer keyConsumer = new KeyConsumer(); keyConsumer.setKeyListener(iKeyListener); - keyConsumer.setQueue(MAPQUEUE.get(i+"")); + keyConsumer.setQueue(MAPQUEUE.get(i + "")); consumerList.add(keyConsumer); + CONSUMERMAP.put(MAPQUEUE.get(i + ""), keyConsumer); threadPoolExecutor.submit(keyConsumer::beginConsume); } diff --git a/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/KeyConsumer.java b/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/KeyConsumer.java index 408189bba55ebab5504e9c35dfd682015f19af79..4e5c94d8e66bdd4499c7190356f481640a6870cb 100644 --- a/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/KeyConsumer.java +++ b/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/KeyConsumer.java @@ -7,6 +7,8 @@ import com.jd.platform.hotkey.worker.keylistener.KeyEventOriginal; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import static com.jd.platform.hotkey.worker.keydispatcher.DispatcherConfig.QUEUE; import static com.jd.platform.hotkey.worker.tool.InitConstant.totalDealCount; @@ -23,6 +25,9 @@ public class KeyConsumer { private Queue queue; + ReentrantLock lock = new ReentrantLock(); + Condition emptyCondition = lock.newCondition(); + public void setKeyListener(IKeyListener iKeyListener) { this.iKeyListener = iKeyListener; @@ -40,7 +45,11 @@ public class KeyConsumer { while (true) { HotKeyModel model = queue.poll(); if (model == null) { - continue; + try { + emptyCondition.await(); + } catch (Exception e) { + } + } if (model.isRemove()) { iKeyListener.removeKey(model, KeyEventOriginal.CLIENT); diff --git a/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/KeyProducer.java b/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/KeyProducer.java index 88df4fdc4124a66fdc8da280ae8c00960bae0412..cf50caa10940595cdec7175b966f5673ce35dfb9 100644 --- a/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/KeyProducer.java +++ b/worker/src/main/java/com/jd/platform/hotkey/worker/keydispatcher/KeyProducer.java @@ -11,6 +11,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import static com.jd.platform.hotkey.worker.keydispatcher.DispatcherConfig.CONSUMERMAP; import static com.jd.platform.hotkey.worker.keydispatcher.DispatcherConfig.MAPQUEUE; import static com.jd.platform.hotkey.worker.keydispatcher.DispatcherConfig.QUEUE; import static com.jd.platform.hotkey.worker.tool.InitConstant.expireTotalCount; @@ -46,11 +47,14 @@ public class KeyProducer { if (map.containsKey(threadId)) { String index = map.get(threadId); MAPQUEUE.get(index).add(model); - + //通知 + CONSUMERMAP.get(MAPQUEUE.get(index)).emptyCondition.signal(); } else { int index = atomicInteger.getAndIncrement(); map.put(threadId, index + ""); MAPQUEUE.get(index).add(model); + //通知 + CONSUMERMAP.get(MAPQUEUE.get(index)).emptyCondition.signal(); }