From 4ae894b001e560af950698f5c5633550b1280c95 Mon Sep 17 00:00:00 2001 From: liuwy <1421132346@qq.com> Date: Fri, 16 Jun 2023 10:55:44 +0800 Subject: [PATCH] =?UTF-8?q?#=202.0.2.a=20=E5=BC=80=E5=8F=91=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 补充 pusher 的宽度,提供更多的系统事件。 - 记录处理失败事件。 - KeepHandler 处理失败事件。 - PersistHandler 处理失败事件。 - 逻辑侧处理器缓冲数据积压事件。 - 记录侧处理器缓冲数据积压事件。 --- .../fdr/impl/handler/PushHandlerImpl.java | 25 +++++++++ .../com/dwarfeng/fdr/impl/handler/Pusher.java | 35 ++++++++++++ .../fdr/impl/handler/pusher/DrainPusher.java | 20 +++++++ .../fdr/impl/handler/pusher/LogPusher.java | 35 ++++++++++++ .../fdr/impl/handler/pusher/MultiPusher.java | 55 +++++++++++++++++++ .../handler/pusher/NativeKafkaPusher.java | 45 +++++++++++++++ .../src/main/resources/fdr/push.properties | 10 ++++ .../fdr/stack/handler/PushHandler.java | 35 ++++++++++++ 8 files changed, 260 insertions(+) diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/PushHandlerImpl.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/PushHandlerImpl.java index e2584123..f2d0092b 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/PushHandlerImpl.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/PushHandlerImpl.java @@ -102,4 +102,29 @@ public class PushHandlerImpl implements PushHandler { public void mapReset() throws HandlerException { pusher.mapReset(); } + + @Override + public void recordOperationFailedEvent() throws HandlerException { + pusher.recordOperationFailedEvent(); + } + + @Override + public void keepHandlerOperationFailedEvent() throws HandlerException { + pusher.keepHandlerOperationFailedEvent(); + } + + @Override + public void persistHandlerOperationFailedEvent() throws HandlerException { + pusher.persistHandlerOperationFailedEvent(); + } + + @Override + public void logicDataOverstockEvent() throws HandlerException { + pusher.logicDataOverstockEvent(); + } + + @Override + public void recordDataOverstockEvent() throws HandlerException { + pusher.recordDataOverstockEvent(); + } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Pusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Pusher.java index 9f2f523f..f8b71931 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Pusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Pusher.java @@ -132,4 +132,39 @@ public interface Pusher { * @throws HandlerException 处理器异常。 */ void mapReset() throws HandlerException; + + /** + * 记录处理失败事件 + * + * @throws HandlerException 处理器异常。 + */ + void recordOperationFailedEvent() throws HandlerException; + + /** + * keepHandler 处理失败事件. + * + * @throws HandlerException 处理器异常。 + */ + void keepHandlerOperationFailedEvent() throws HandlerException; + + /** + * persistHandler 处理失败事件. + * + * @throws HandlerException 处理器异常。 + */ + void persistHandlerOperationFailedEvent() throws HandlerException; + + /** + * 逻辑侧处理器缓冲数据积压事件. + * + * @throws HandlerException 处理器异常。 + */ + void logicDataOverstockEvent() throws HandlerException; + + /** + * 记录侧处理器缓冲数据积压事件. + * + * @throws HandlerException 处理器异常。 + */ + void recordDataOverstockEvent() throws HandlerException; } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/DrainPusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/DrainPusher.java index 5e1e75f6..f50efb3a 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/DrainPusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/DrainPusher.java @@ -78,6 +78,26 @@ public class DrainPusher extends AbstractPusher { public void mapReset() { } + @Override + public void recordOperationFailedEvent() { + } + + @Override + public void keepHandlerOperationFailedEvent() { + } + + @Override + public void persistHandlerOperationFailedEvent() { + } + + @Override + public void logicDataOverstockEvent() { + } + + @Override + public void recordDataOverstockEvent() { + } + @Override public String toString() { return "DrainPusher{" + diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/LogPusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/LogPusher.java index 0da976d3..29189739 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/LogPusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/LogPusher.java @@ -140,6 +140,41 @@ public class LogPusher extends AbstractPusher { logData(title, message); } + @Override + public void recordOperationFailedEvent() throws HandlerException { + String title = "记录处理失败事件:"; + String message = StringUtils.EMPTY; + logData(title, message); + } + + @Override + public void keepHandlerOperationFailedEvent() throws HandlerException { + String title = "keepHandler 处理失败事件:"; + String message = StringUtils.EMPTY; + logData(title, message); + } + + @Override + public void persistHandlerOperationFailedEvent() throws HandlerException { + String title = "persistHandler 处理失败事件:"; + String message = StringUtils.EMPTY; + logData(title, message); + } + + @Override + public void logicDataOverstockEvent() throws HandlerException { + String title = "逻辑侧处理器缓冲数据积压事件:"; + String message = StringUtils.EMPTY; + logData(title, message); + } + + @Override + public void recordDataOverstockEvent() throws HandlerException { + String title = "记录侧处理器缓冲数据积压事件:"; + String message = StringUtils.EMPTY; + logData(title, message); + } + private void logData(String title, String message) throws HandlerException { String logLevel = this.logLevel.toUpperCase(); switch (logLevel) { diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/MultiPusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/MultiPusher.java index 17016a7f..ac578432 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/MultiPusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/MultiPusher.java @@ -201,6 +201,61 @@ public class MultiPusher extends AbstractPusher { } } + @Override + public void recordOperationFailedEvent() { + for (Pusher delegate : delegates) { + try { + delegate.recordOperationFailedEvent(); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepHandlerOperationFailedEvent() { + for (Pusher delegate : delegates) { + try { + delegate.keepHandlerOperationFailedEvent(); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistHandlerOperationFailedEvent() { + for (Pusher delegate : delegates) { + try { + delegate.persistHandlerOperationFailedEvent(); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void logicDataOverstockEvent() { + for (Pusher delegate : delegates) { + try { + delegate.logicDataOverstockEvent(); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void recordDataOverstockEvent() { + for (Pusher delegate : delegates) { + try { + delegate.recordDataOverstockEvent(); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + @Override public String toString() { return "MultiPusher{" + diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/NativeKafkaPusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/NativeKafkaPusher.java index 68ab6d74..50023f3f 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/NativeKafkaPusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/NativeKafkaPusher.java @@ -57,6 +57,16 @@ public class NativeKafkaPusher extends AbstractPusher { private String recordResetTopic; @Value("${pusher.native.kafka.topic.map_reset}") private String mapResetTopic; + @Value("${pusher.native.kafka.topic.record_operation_failed_event}") + private String recordOperationFailedEventTopic; + @Value("${pusher.native.kafka.topic.keepHandler_operation_failed_event}") + private String keepHandlerOperationFailedEventTopic; + @Value("${pusher.native.kafka.topic.persistHandler_operation_failed_event}") + private String persistHandlerOperationFailedEventTopic; + @Value("${pusher.native.kafka.topic.logic_data_overstock_event}") + private String logicDataOverstockEventTopic; + @Value("${pusher.native.kafka.topic.record_data_overstock_event}") + private String recordDataOverstockEventTopic; public NativeKafkaPusher( @Qualifier("nativeKafkaPusher.kafkaTemplate") KafkaTemplate kafkaTemplate @@ -155,6 +165,36 @@ public class NativeKafkaPusher extends AbstractPusher { kafkaTemplate.send(mapResetTopic, StringUtils.EMPTY); } + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void recordOperationFailedEvent() { + kafkaTemplate.send(recordOperationFailedEventTopic, StringUtils.EMPTY); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepHandlerOperationFailedEvent() { + kafkaTemplate.send(keepHandlerOperationFailedEventTopic, StringUtils.EMPTY); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistHandlerOperationFailedEvent() { + kafkaTemplate.send(persistHandlerOperationFailedEventTopic, StringUtils.EMPTY); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void logicDataOverstockEvent() { + kafkaTemplate.send(logicDataOverstockEventTopic, StringUtils.EMPTY); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void recordDataOverstockEvent() { + kafkaTemplate.send(recordDataOverstockEventTopic, StringUtils.EMPTY); + } + @Override public String toString() { return "NativeKafkaPusher{" + @@ -167,6 +207,11 @@ public class NativeKafkaPusher extends AbstractPusher { ", triggeredRecordedTopic='" + triggeredRecordedTopic + '\'' + ", recordResetTopic='" + recordResetTopic + '\'' + ", mapResetTopic='" + mapResetTopic + '\'' + + ", recordOperationFailedEventTopic='" + recordOperationFailedEventTopic + '\'' + + ", keepHandlerOperationFailedEventTopic='" + keepHandlerOperationFailedEventTopic + '\'' + + ", persistHandlerOperationFailedEventTopic='" + persistHandlerOperationFailedEventTopic + '\'' + + ", logicDataOverstockEventTopic='" + logicDataOverstockEventTopic + '\'' + + ", recordDataOverstockEventTopic='" + recordDataOverstockEventTopic + '\'' + '}'; } diff --git a/fdr-node/src/main/resources/fdr/push.properties b/fdr-node/src/main/resources/fdr/push.properties index e8a10b7b..a3c98c91 100644 --- a/fdr-node/src/main/resources/fdr/push.properties +++ b/fdr-node/src/main/resources/fdr/push.properties @@ -56,6 +56,16 @@ pusher.native.kafka.topic.triggered_recorded=fdr.pusher.triggered_recorded pusher.native.kafka.topic.record_reset=fdr.pusher.record_reset # \u6620\u5C04\u529F\u80FD\u91CD\u7F6E\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 pusher.native.kafka.topic.map_reset=fdr.pusher.map_reset +# \u8BB0\u5F55\u5904\u7406\u5931\u8D25\u4E8B\u4EF6\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.native.kafka.topic.record_operation_failed_event=fdr.pusher.record_operation_failed_event +# KeepHandler \u5904\u7406\u5931\u8D25\u4E8B\u4EF6\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.native.kafka.topic.keepHandler_operation_failed_event=fdr.pusher.keepHandler_operation_failed_event +# persistHandler \u5904\u7406\u5931\u8D25\u4E8B\u4EF6\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.native.kafka.topic.persistHandler_operation_failed_event=fdr.pusher.persistHandler_operation_failed_event +# \u53D1\u751F\u903B\u8F91\u4FA7\u5904\u7406\u5668\u7F13\u51B2\u6570\u636E\u79EF\u538B\u4E8B\u4EF6\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.native.kafka.topic.logic_data_overstock_event=fdr.pusher.logic_data_overstock_event +# \u53D1\u751F\u8BB0\u5F55\u4FA7\u5904\u7406\u5668\u7F13\u51B2\u6570\u636E\u79EF\u538B\u4E8B\u4EF6\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.native.kafka.topic.record_data_overstock_event=fdr.pusher.record_data_overstock_event # ################################################### # log # diff --git a/fdr-stack/src/main/java/com/dwarfeng/fdr/stack/handler/PushHandler.java b/fdr-stack/src/main/java/com/dwarfeng/fdr/stack/handler/PushHandler.java index 62b56d26..e59eef82 100644 --- a/fdr-stack/src/main/java/com/dwarfeng/fdr/stack/handler/PushHandler.java +++ b/fdr-stack/src/main/java/com/dwarfeng/fdr/stack/handler/PushHandler.java @@ -125,4 +125,39 @@ public interface PushHandler extends Handler { * @throws HandlerException 处理器异常。 */ void mapReset() throws HandlerException; + + /** + * 记录处理失败事件. + * + * @throws HandlerException 处理器异常。 + */ + void recordOperationFailedEvent() throws HandlerException; + + /** + * keepHandler 处理失败事件. + * + * @throws HandlerException 处理器异常。 + */ + void keepHandlerOperationFailedEvent() throws HandlerException; + + /** + * persistHandler 处理失败事件. + * + * @throws HandlerException 处理器异常。 + */ + void persistHandlerOperationFailedEvent() throws HandlerException; + + /** + * 逻辑侧处理器缓冲数据积压事件. + * + * @throws HandlerException 处理器异常。 + */ + void logicDataOverstockEvent() throws HandlerException; + + /** + * 记录侧处理器缓冲数据积压事件. + * + * @throws HandlerException 处理器异常。 + */ + void recordDataOverstockEvent() throws HandlerException; } -- Gitee