diff --git a/CHANGELOG.md b/CHANGELOG.md index 6eb1b579a13d9ca5f0582fade73099443cbab7b9..635fd2909dd49c1cb02fbe3b4eeb179419d61d67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ #### 功能构建 -- (无) +- Wiki 更新。 + - 优化 `Contents.md` 中的内容。 #### Bug修复 diff --git a/docs/wiki/zh_CN/ConfDirectory.md b/docs/wiki/zh_CN/ConfDirectory.md index e411be115032284774e61e881dfc99279d3f2eaa..a01a9d5d5e83565b3ccbbeb67e34601d4d0d01b1 100644 --- a/docs/wiki/zh_CN/ConfDirectory.md +++ b/docs/wiki/zh_CN/ConfDirectory.md @@ -507,7 +507,7 @@ launcher.start_reset_delay=30000 # 目前该项目支持的推送器类型有: # drain: 简单的丢弃掉所有消息的推送器。 # multi: 同时将消息推送给所有代理的多重推送器。 -# native.kafka: 使用原生数据的基于Kafka消息队列的推送器。 +# kafka.native: 使用原生数据的基于Kafka消息队列的推送器。 # log: 将消息输出到日志中的推送器。 # # 对于一个具体的项目,很可能只用一个推送器。此时如果希望程序加载时只加载一个推送器,可以通过编辑 @@ -524,40 +524,40 @@ pusher.type=drain # multi # ################################################### # 代理的推送器,推送器之间以逗号分隔。 -pusher.multi.delegate_types=native.kafka +pusher.multi.delegate_types=kafka.native # ################################################### -# native.kafka # +# kafka.native # ################################################### # broker集群。 -pusher.native.kafka.bootstrap_servers=your ip here like ip1:9092,ip2:9092,ip3:9092 +pusher.kafka.native.bootstrap_servers=your ip here like ip1:9092,ip2:9092,ip3:9092 # 连接属性。 -pusher.native.kafka.acks=all +pusher.kafka.native.acks=all # 发送失败重试次数。 -pusher.native.kafka.retries=3 -pusher.native.kafka.linger=10 +pusher.kafka.native.retries=3 +pusher.kafka.native.linger=10 # 的批处理缓冲区大小。 -pusher.native.kafka.buffer_memory=40960 +pusher.kafka.native.buffer_memory=40960 # 批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能。 -pusher.native.kafka.batch_size=4096 +pusher.kafka.native.batch_size=4096 # Kafka事务的前缀。 -pusher.native.kafka.transaction_prefix=fdr.pusher. +pusher.kafka.native.transaction_prefix=fdr.pusher. # 一般数据更新时向 Kafka 发送的主题。 -pusher.native.kafka.topic.normal_updated=fdr.pusher.normal_updated +pusher.kafka.native.topic.normal_updated=fdr.pusher.normal_updated # 一般数据记录时向 Kafka 发送的主题。 -pusher.native.kafka.topic.normal_recorded=fdr.pusher.normal_recorded +pusher.kafka.native.topic.normal_recorded=fdr.pusher.normal_recorded # 被过滤数据更新时向 Kafka 发送的主题。 -pusher.native.kafka.topic.filtered_updated=fdr.pusher.filtered_updated +pusher.kafka.native.topic.filtered_updated=fdr.pusher.filtered_updated # 被过滤数据记录时向 Kafka 发送的主题。 -pusher.native.kafka.topic.filtered_recorded=fdr.pusher.filtered_recorded +pusher.kafka.native.topic.filtered_recorded=fdr.pusher.filtered_recorded # 被触发数据更新时向 Kafka 发送的主题。 -pusher.native.kafka.topic.triggered_updated=fdr.pusher.triggered_updated +pusher.kafka.native.topic.triggered_updated=fdr.pusher.triggered_updated # 被触发数据记录时向 Kafka 发送的主题。 -pusher.native.kafka.topic.triggered_recorded=fdr.pusher.triggered_recorded +pusher.kafka.native.topic.triggered_recorded=fdr.pusher.triggered_recorded # 记录功能重置时向 Kafka 发送的主题。 -pusher.native.kafka.topic.record_reset=fdr.pusher.record_reset +pusher.kafka.native.topic.record_reset=fdr.pusher.record_reset # 映射功能重置时向 Kafka 发送的主题。 -pusher.native.kafka.topic.map_reset=fdr.pusher.map_reset +pusher.kafka.native.topic.map_reset=fdr.pusher.map_reset # ################################################### # log # diff --git a/docs/wiki/zh_CN/Contents.md b/docs/wiki/zh_CN/Contents.md index cec790e5565187705ae4a6db789477ab77111fb0..c97b11086c250b75d3f190afd12dad6b2f798127 100644 --- a/docs/wiki/zh_CN/Contents.md +++ b/docs/wiki/zh_CN/Contents.md @@ -43,7 +43,6 @@ - [Source](./Source.md) - 数据源,详细说明了本项目的数据源机制。 - [PresetSourceImplements.md](./PresetSourceImplements.md) - 预设数据源实现,详细说明了本项目内置的所有数据源。 - - [Washer](./Washer.md) - 清洗器,详细说明了本项目的清洗器机制。 ## 维护与调试 diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/ConsumeHandlerImpl.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/ConsumeHandlerImpl.java index 859fbd6be25d45a8e839955020a0ac76bb5e50d1..cbe493bb82cd3554db87e113ca6bec10071a07ef 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/ConsumeHandlerImpl.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/ConsumeHandlerImpl.java @@ -94,6 +94,12 @@ public class ConsumeHandlerImpl implements ConsumeHandler { if (ratio >= warnThreshold) { String message = "消费者为 {} 的记录侧的待消费元素占用缓存比例为 {},超过报警值 {},请检查"; LOGGER.warn(message, consumer.getClass().getSimpleName(), ratio, warnThreshold); + + try { + consumer.consumeBufferOverstock(consumeBuffer.bufferedSize(), consumeBuffer.getBufferSize(), ratio, warnThreshold); + } catch (Exception e) { + LOGGER.warn(e.toString()); + } } }, Constants.SCHEDULER_CHECK_INTERVAL); diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Consumer.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Consumer.java index 2868654c654103137ef3e0b7df1f367fe628ab10..75552da2d94cf0f0b52c52f92c8008086aee382d 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Consumer.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Consumer.java @@ -20,4 +20,15 @@ public interface Consumer { * @throws HandlerException 处理器异常。 */ void consume(List datas) throws HandlerException; + + /** + * 消费者缓冲数据积压。 + * + * @param bufferSize 待消费元素数量 + * @param allBufferSize 缓存元素数量 + * @param ratio 待消费元素数量占缓存元素数量比例 + * @param warnThreshold 报警值 + * @throws HandlerException 处理器异常。 + */ + void consumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/PushHandlerImpl.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/PushHandlerImpl.java index e2584123d3328a6378026f22d369919d5c62969c..4c0b10d22a44c774b429a379535aaa7e449cbc5e 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/PushHandlerImpl.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/PushHandlerImpl.java @@ -2,6 +2,7 @@ package com.dwarfeng.fdr.impl.handler; import com.dwarfeng.fdr.stack.bean.dto.FilteredData; import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import com.dwarfeng.fdr.stack.bean.dto.RecordInfo; import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; import com.dwarfeng.fdr.stack.handler.PushHandler; import com.dwarfeng.subgrade.stack.exception.HandlerException; @@ -102,4 +103,104 @@ public class PushHandlerImpl implements PushHandler { public void mapReset() throws HandlerException { pusher.mapReset(); } + + @Override + public void recordFailed(RecordInfo recordInfo) throws HandlerException { + pusher.recordFailed(recordInfo); + } + + @Override + public void keepNormalFailed(NormalData normalData) throws HandlerException { + pusher.keepNormalFailed(normalData); + } + + @Override + public void keepNormalFailed(List normalDatas) throws HandlerException { + pusher.keepNormalFailed(normalDatas); + } + + @Override + public void persistNormalFailed(NormalData normalData) throws HandlerException { + pusher.persistNormalFailed(normalData); + } + + @Override + public void persistNormalFailed(List normalDatas) throws HandlerException { + pusher.persistNormalFailed(normalDatas); + } + + @Override + public void keepFilteredFailed(FilteredData filteredData) throws HandlerException { + pusher.keepFilteredFailed(filteredData); + } + + @Override + public void keepFilteredFailed(List filteredDatas) throws HandlerException { + pusher.keepFilteredFailed(filteredDatas); + } + + @Override + public void persistFilteredFailed(FilteredData filteredData) throws HandlerException { + pusher.persistFilteredFailed(filteredData); + } + + @Override + public void persistFilteredFailed(List filteredDatas) throws HandlerException { + pusher.persistFilteredFailed(filteredDatas); + } + + @Override + public void keepTriggeredFailed(TriggeredData triggeredData) throws HandlerException { + pusher.keepTriggeredFailed(triggeredData); + } + + @Override + public void keepTriggeredFailed(List triggeredDatas) throws HandlerException { + pusher.keepTriggeredFailed(triggeredDatas); + } + + @Override + public void persistTriggeredFailed(TriggeredData triggeredData) throws HandlerException { + pusher.persistTriggeredFailed(triggeredData); + } + + @Override + public void persistTriggeredFailed(List triggeredDatas) throws HandlerException { + pusher.persistTriggeredFailed(triggeredDatas); + } + + @Override + public void recordBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pusher.recordBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } + + @Override + public void keepNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pusher.keepNormalConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } + + @Override + public void keepFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pusher.keepFilteredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } + + @Override + public void keepTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pusher.keepTriggeredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } + + @Override + public void persistNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pusher.persistNormalConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } + + @Override + public void persistFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pusher.persistFilteredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } + + @Override + public void persistTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pusher.persistTriggeredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Pusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Pusher.java index 9f2f523ff08d9462f423b9920775adbdbe29cfd4..1eefe1566c38e06e70065b97cbde7ac4d456cd66 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Pusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/Pusher.java @@ -2,6 +2,7 @@ package com.dwarfeng.fdr.impl.handler; import com.dwarfeng.fdr.stack.bean.dto.FilteredData; import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import com.dwarfeng.fdr.stack.bean.dto.RecordInfo; import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; import com.dwarfeng.subgrade.stack.exception.HandlerException; @@ -132,4 +133,156 @@ public interface Pusher { * @throws HandlerException 处理器异常。 */ void mapReset() throws HandlerException; + + /** + * 处理失败的广播操作. + * + * @throws HandlerException 处理器异常。 + */ + void recordFailed(RecordInfo recordInfo) throws HandlerException; + + /** + * 一般数据更新失败的广播操作。 + * + * @param normalData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void keepNormalFailed(NormalData normalData) throws HandlerException; + + /** + * 一般数据更新失败的广播操作。 + * + * @param normalDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void keepNormalFailed(List normalDatas) throws HandlerException; + + /** + * 一般数据记录失败的广播操作。 + * + * @param normalData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void persistNormalFailed(NormalData normalData) throws HandlerException; + + /** + * 一般数据记录失败的广播操作。 + * + * @param normalDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void persistNormalFailed(List normalDatas) throws HandlerException; + + /** + * 被过滤数据更新失败的广播操作。 + * + * @param filteredData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void keepFilteredFailed(FilteredData filteredData) throws HandlerException; + + /** + * 被过滤数据更新失败的广播操作。 + * + * @param filteredDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void keepFilteredFailed(List filteredDatas) throws HandlerException; + + /** + * 被过滤数据记录失败的广播操作。 + * + * @param filteredData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void persistFilteredFailed(FilteredData filteredData) throws HandlerException; + + /** + * 被过滤数据记录失败的广播操作。 + * + * @param filteredDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void persistFilteredFailed(List filteredDatas) throws HandlerException; + + /** + * 被触发数据更新失败的广播操作。 + * + * @param triggeredData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void keepTriggeredFailed(TriggeredData triggeredData) throws HandlerException; + + /** + * 被触发数据更新失败的广播操作。 + * + * @param triggeredDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void keepTriggeredFailed(List triggeredDatas) throws HandlerException; + + /** + * 被触发数据记录失败的广播操作。 + * + * @param triggeredData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void persistTriggeredFailed(TriggeredData triggeredData) throws HandlerException; + + /** + * 被触发数据记录失败的广播操作。 + * + * @param triggeredDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void persistTriggeredFailed(List triggeredDatas) throws HandlerException; + + /** + * 逻辑侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void recordBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 一般数据更新时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void keepNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 被过滤数据更新时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void keepFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 被触发数据更新时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void keepTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 一般数据记录时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void persistNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 被过滤数据记录时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void persistFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 被触发数据记录时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void persistTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/RecordProcessor.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/RecordProcessor.java index d632671b71e1b66d2f5045206cc3f5d4b5944525..6c0725f75dfdaeb9bcff2223748702de75a102ce 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/RecordProcessor.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/RecordProcessor.java @@ -14,6 +14,7 @@ import com.dwarfeng.subgrade.stack.bean.key.LongIdKey; import com.dwarfeng.subgrade.stack.exception.HandlerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -38,6 +39,8 @@ class RecordProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(RecordProcessor.class); + private final PushHandler pushHandler; + private final ThreadPoolTaskExecutor executor; private final ThreadPoolTaskScheduler scheduler; @@ -60,12 +63,14 @@ class RecordProcessor { ThreadPoolTaskExecutor executor, ThreadPoolTaskScheduler scheduler, Consumer consumer, - ConsumeBuffer consumeBuffer + ConsumeBuffer consumeBuffer, + PushHandler pushHandler ) { this.executor = executor; this.scheduler = scheduler; this.consumer = consumer; this.consumeBuffer = consumeBuffer; + this.pushHandler = pushHandler; } public void start() { @@ -86,6 +91,12 @@ class RecordProcessor { if (ratio >= warnThreshold) { String message = "逻辑侧的待消费元素占用缓存比例为 {},超过报警值 {},请检查"; LOGGER.warn(message, ratio, warnThreshold); + + try { + pushHandler.recordBufferOverstock(consumeBuffer.bufferedSize(), consumeBuffer.getBufferSize(), ratio, warnThreshold); + } catch (HandlerException e) { + LOGGER.warn(e.toString()); + } } }, Constants.SCHEDULER_CHECK_INTERVAL); @@ -287,20 +298,24 @@ class RecordProcessor { private final ConsumeHandler triggeredKeepConsumeHandler; private final ConsumeHandler triggeredPersistConsumeHandler; + private final PushHandler pusherHandler; + public Consumer( RecordLocalCacheHandler recordLocalCacheHandler, @Qualifier("normalKeepConsumeHandler") - ConsumeHandler normalKeepConsumeHandler, + ConsumeHandler normalKeepConsumeHandler, @Qualifier("normalPersistConsumeHandler") - ConsumeHandler normalPersistConsumeHandler, + ConsumeHandler normalPersistConsumeHandler, @Qualifier("filteredKeepConsumeHandler") - ConsumeHandler filteredKeepConsumeHandler, + ConsumeHandler filteredKeepConsumeHandler, @Qualifier("filteredPersistConsumeHandler") - ConsumeHandler filteredPersistConsumeHandler, + ConsumeHandler filteredPersistConsumeHandler, @Qualifier("triggeredKeepConsumeHandler") - ConsumeHandler triggeredKeepConsumeHandler, + ConsumeHandler triggeredKeepConsumeHandler, @Qualifier("triggeredPersistConsumeHandler") - ConsumeHandler triggeredPersistConsumeHandler + ConsumeHandler triggeredPersistConsumeHandler, + @Autowired + PushHandler pusherHandler ) { this.recordLocalCacheHandler = recordLocalCacheHandler; this.normalKeepConsumeHandler = normalKeepConsumeHandler; @@ -309,121 +324,134 @@ class RecordProcessor { this.filteredPersistConsumeHandler = filteredPersistConsumeHandler; this.triggeredKeepConsumeHandler = triggeredKeepConsumeHandler; this.triggeredPersistConsumeHandler = triggeredPersistConsumeHandler; + this.pusherHandler = pusherHandler; } public void consume(RecordInfo recordInfo) throws HandlerException { + + HandlerException consumeThrowException = null; + try { - // 记录日志,准备工作。 - LOGGER.debug("记录数据信息: " + recordInfo); - LongIdKey pointKey = recordInfo.getPointKey(); - - // 获取 RecordContext。 - RecordLocalCacheHandler.RecordContext recordContext = recordLocalCacheHandler.get(pointKey); - if (Objects.isNull(recordContext)) { - throw new PointNotExistsException(pointKey); - } - Point point = recordContext.getPoint(); - Map preFilterWasherMap = recordContext.getPreFilterWasherMap(); - Map filterMap = recordContext.getFilterMap(); - Map postFilterWasherMap = recordContext.getPostFilterWasherMap(); - Map triggerMap = recordContext.getTriggerMap(); - - // 遍历所有的过滤前清洗器,清洗数据。 - for (Map.Entry entry : preFilterWasherMap.entrySet()) { - Washer washer = entry.getValue(); - - Object rawValue = recordInfo.getValue(); - LOGGER.debug("数据信息经过过滤前清洗, 原始数据点信息: " + rawValue); - Object washedValue = washer.wash(rawValue); - LOGGER.debug("数据信息经过过滤前清洗, 清洗数据点信息: " + washedValue); - - recordInfo.setValue(washedValue); - } + doConsume(recordInfo); + } catch (HandlerException e) { + consumeThrowException = e; + } catch (Exception e) { + consumeThrowException = new HandlerException(e); + } - // 遍历所有的过滤器,任意一个过滤器未通过时,根据数据点配置保持或持久被过滤数据,随后终止。 - for (Map.Entry entry : filterMap.entrySet()) { - Object value = recordInfo.getValue(); - Date happenedDate = recordInfo.getHappenedDate(); + if (Objects.nonNull(consumeThrowException)) { + pusherHandler.recordFailed(recordInfo); + throw consumeThrowException; + } + } - LongIdKey filterKey = entry.getKey(); - Filter filter = entry.getValue(); + private void doConsume(RecordInfo recordInfo) throws HandlerException { + // 记录日志,准备工作。 + LOGGER.debug("记录数据信息: " + recordInfo); + LongIdKey pointKey = recordInfo.getPointKey(); - Filter.TestInfo testInfo = new Filter.TestInfo(pointKey, value, happenedDate); - Filter.TestResult testResult = filter.test(testInfo); + // 获取 RecordContext。 + RecordLocalCacheHandler.RecordContext recordContext = recordLocalCacheHandler.get(pointKey); + if (Objects.isNull(recordContext)) { + throw new PointNotExistsException(pointKey); + } + Point point = recordContext.getPoint(); + Map preFilterWasherMap = recordContext.getPreFilterWasherMap(); + Map filterMap = recordContext.getFilterMap(); + Map postFilterWasherMap = recordContext.getPostFilterWasherMap(); + Map triggerMap = recordContext.getTriggerMap(); + + // 遍历所有的过滤前清洗器,清洗数据。 + for (Map.Entry entry : preFilterWasherMap.entrySet()) { + Washer washer = entry.getValue(); + + Object rawValue = recordInfo.getValue(); + LOGGER.debug("数据信息经过过滤前清洗, 原始数据点信息: " + rawValue); + Object washedValue = washer.wash(rawValue); + LOGGER.debug("数据信息经过过滤前清洗, 清洗数据点信息: " + washedValue); + + recordInfo.setValue(washedValue); + } - if (testResult.isFiltered()) { - FilteredData filteredRecord = new FilteredData( - pointKey, filterKey, value, testResult.getMessage(), happenedDate - ); - LOGGER.debug("数据信息未通过过滤, 过滤数据点信息: " + filteredRecord); + // 遍历所有的过滤器,任意一个过滤器未通过时,根据数据点配置保持或持久被过滤数据,随后终止。 + for (Map.Entry entry : filterMap.entrySet()) { + Object value = recordInfo.getValue(); + Date happenedDate = recordInfo.getHappenedDate(); - if (point.isFilteredKeepEnabled()) { - filteredKeepConsumeHandler.accept(filteredRecord); - } - if (point.isFilteredPersistEnabled()) { - filteredPersistConsumeHandler.accept(filteredRecord); - } - return; + LongIdKey filterKey = entry.getKey(); + Filter filter = entry.getValue(); + + Filter.TestInfo testInfo = new Filter.TestInfo(pointKey, value, happenedDate); + Filter.TestResult testResult = filter.test(testInfo); + + if (testResult.isFiltered()) { + FilteredData filteredRecord = new FilteredData( + pointKey, filterKey, value, testResult.getMessage(), happenedDate + ); + LOGGER.debug("数据信息未通过过滤, 过滤数据点信息: " + filteredRecord); + + if (point.isFilteredKeepEnabled()) { + filteredKeepConsumeHandler.accept(filteredRecord); + } + if (point.isFilteredPersistEnabled()) { + filteredPersistConsumeHandler.accept(filteredRecord); } + return; } + } - // 遍历所有的过滤后清洗器,清洗数据。 - for (Map.Entry entry : postFilterWasherMap.entrySet()) { - Washer washer = entry.getValue(); + // 遍历所有的过滤后清洗器,清洗数据。 + for (Map.Entry entry : postFilterWasherMap.entrySet()) { + Washer washer = entry.getValue(); - Object rawValue = recordInfo.getValue(); - LOGGER.debug("数据信息经过过滤后清洗, 原始数据点信息: " + rawValue); - Object washedValue = washer.wash(rawValue); - LOGGER.debug("数据信息经过过滤后清洗, 清洗数据点信息: " + washedValue); + Object rawValue = recordInfo.getValue(); + LOGGER.debug("数据信息经过过滤后清洗, 原始数据点信息: " + rawValue); + Object washedValue = washer.wash(rawValue); + LOGGER.debug("数据信息经过过滤后清洗, 清洗数据点信息: " + washedValue); - recordInfo.setValue(washedValue); - } + recordInfo.setValue(washedValue); + } - // 遍历所有的触发器,任意一个触发器触发时,根据数据点配置保持或持久触发数据。 - for (Map.Entry entry : triggerMap.entrySet()) { - Object value = recordInfo.getValue(); - Date happenedDate = recordInfo.getHappenedDate(); + // 遍历所有的触发器,任意一个触发器触发时,根据数据点配置保持或持久触发数据。 + for (Map.Entry entry : triggerMap.entrySet()) { + Object value = recordInfo.getValue(); + Date happenedDate = recordInfo.getHappenedDate(); - LongIdKey triggerKey = entry.getKey(); - Trigger trigger = entry.getValue(); + LongIdKey triggerKey = entry.getKey(); + Trigger trigger = entry.getValue(); - Trigger.TestInfo testInfo = new Trigger.TestInfo(pointKey, value, happenedDate); - Trigger.TestResult testResult = trigger.test(testInfo); + Trigger.TestInfo testInfo = new Trigger.TestInfo(pointKey, value, happenedDate); + Trigger.TestResult testResult = trigger.test(testInfo); - if (testResult.isTriggered()) { - TriggeredData triggeredRecord = new TriggeredData( - pointKey, triggerKey, value, testResult.getMessage(), happenedDate - ); - LOGGER.debug("数据信息满足触发条件, 触发数据点信息: " + triggeredRecord); + if (testResult.isTriggered()) { + TriggeredData triggeredRecord = new TriggeredData( + pointKey, triggerKey, value, testResult.getMessage(), happenedDate + ); + LOGGER.debug("数据信息满足触发条件, 触发数据点信息: " + triggeredRecord); - if (point.isTriggeredKeepEnabled()) { - triggeredKeepConsumeHandler.accept(triggeredRecord); - } - if (point.isTriggeredPersistEnabled()) { - triggeredPersistConsumeHandler.accept(triggeredRecord); - } + if (point.isTriggeredKeepEnabled()) { + triggeredKeepConsumeHandler.accept(triggeredRecord); + } + if (point.isTriggeredPersistEnabled()) { + triggeredPersistConsumeHandler.accept(triggeredRecord); } } + } - // 生成一般数据,根据数据点配置保持或持久一般数据。 - { - Object value = recordInfo.getValue(); - Date happenedDate = recordInfo.getHappenedDate(); + // 生成一般数据,根据数据点配置保持或持久一般数据。 + { + Object value = recordInfo.getValue(); + Date happenedDate = recordInfo.getHappenedDate(); - NormalData normalRecord = new NormalData(pointKey, value, happenedDate); - LOGGER.debug("记录一般数据: " + normalRecord); + NormalData normalRecord = new NormalData(pointKey, value, happenedDate); + LOGGER.debug("记录一般数据: " + normalRecord); - if (point.isNormalKeepEnabled()) { - normalKeepConsumeHandler.accept(normalRecord); - } - if (point.isNormalPersistEnabled()) { - normalPersistConsumeHandler.accept(normalRecord); - } + if (point.isNormalKeepEnabled()) { + normalKeepConsumeHandler.accept(normalRecord); + } + if (point.isNormalPersistEnabled()) { + normalPersistConsumeHandler.accept(normalRecord); } - } catch (HandlerException e) { - throw e; - } catch (Exception e) { - throw new HandlerException(e); } } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/FilteredKeepConsumer.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/FilteredKeepConsumer.java index c61fdba73b5ff398223b0a29c5aedc0934fa1b00..e47334cb4d5d6a6f699ada6e362327e41d010750 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/FilteredKeepConsumer.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/FilteredKeepConsumer.java @@ -21,17 +21,32 @@ public class FilteredKeepConsumer extends KeepConsumer { @BehaviorAnalyse @Override - public void consume(@SkipRecord List records) throws HandlerException { - super.consume(records); + public void consume(@SkipRecord List filteredDatas) throws HandlerException { + super.consume(filteredDatas); } @Override - protected void doPush(List records) throws HandlerException { - pushHandler.filteredUpdated(records); + public void consumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pushHandler.keepFilteredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); } @Override - protected void doPush(FilteredData record) throws HandlerException { - pushHandler.filteredUpdated(record); + protected void doSuccessPush(List filteredDatas) throws HandlerException { + pushHandler.filteredUpdated(filteredDatas); + } + + @Override + protected void doSuccessPush(FilteredData filteredData) throws HandlerException { + pushHandler.filteredUpdated(filteredData); + } + + @Override + protected void doFailedPush(List filteredDatas) throws HandlerException { + pushHandler.keepFilteredFailed(filteredDatas); + } + + @Override + protected void doFailedPush(FilteredData filteredData) throws HandlerException { + pushHandler.keepFilteredFailed(filteredData); } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/FilteredPersistConsumer.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/FilteredPersistConsumer.java index fe3d4b22dbfbe752e6b967dbdc096c0e19c70383..8ce966a5bbb73c98312553ee58603e7d7788c597 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/FilteredPersistConsumer.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/FilteredPersistConsumer.java @@ -17,17 +17,32 @@ public class FilteredPersistConsumer extends PersistConsumer { @BehaviorAnalyse @Override - public void consume(@SkipRecord List records) throws HandlerException { - super.consume(records); + public void consume(@SkipRecord List filteredDatas) throws HandlerException { + super.consume(filteredDatas); } @Override - protected void doPush(List records) throws HandlerException { - pushHandler.filteredRecorded(records); + public void consumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pushHandler.persistFilteredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); } @Override - protected void doPush(FilteredData record) throws HandlerException { - pushHandler.filteredRecorded(record); + protected void doSuccessPush(List filteredDatas) throws HandlerException { + pushHandler.filteredRecorded(filteredDatas); + } + + @Override + protected void doSuccessPush(FilteredData filteredData) throws HandlerException { + pushHandler.filteredRecorded(filteredData); + } + + @Override + protected void doFailedPush(List filteredDatas) throws HandlerException { + pushHandler.persistFilteredFailed(filteredDatas); + } + + @Override + protected void doFailedPush(FilteredData filteredData) throws HandlerException { + pushHandler.persistFilteredFailed(filteredData); } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/KeepConsumer.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/KeepConsumer.java index d157eeddcc555b1ded4b18d0c0acd992e82b3970..2b05f0094ea84adeef72833c007426cc6e43c6c6 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/KeepConsumer.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/KeepConsumer.java @@ -157,7 +157,7 @@ public abstract class KeepConsumer implements Consumer { private void pushRecords(List records) { // 优先尝试批量推送数据,如果批量推送失败,则尝试逐条推送数据。 try { - doPush(records); + doSuccessPush(records); } catch (Exception e) { LOGGER.error("数据推送失败, 试图使用不同的策略进行推送: 逐条推送", e); } @@ -168,7 +168,7 @@ public abstract class KeepConsumer implements Consumer { // 遍历 records 中的所有数据记录,逐条推送数据。 for (R record : records) { try { - doPush(record); + doSuccessPush(record); } catch (Exception e) { LOGGER.error("数据推送失败, 放弃对该数据的推送: " + record, e); failedList.add(record); @@ -179,6 +179,11 @@ public abstract class KeepConsumer implements Consumer { if (!failedList.isEmpty()) { LOGGER.error("推送数据时发生异常, 最多 " + failedList.size() + " 个数据信息丢失"); failedList.forEach(record -> LOGGER.debug(Objects.toString(record))); + try { + doFailedPush(failedList); + } catch (HandlerException e) { + e.printStackTrace(); + } } } @@ -188,7 +193,7 @@ public abstract class KeepConsumer implements Consumer { * @param records 数据记录组成的列表。 * @throws HandlerException 处理器异常。 */ - protected abstract void doPush(List records) throws HandlerException; + protected abstract void doSuccessPush(List records) throws HandlerException; /** * 推送数据。 @@ -196,7 +201,23 @@ public abstract class KeepConsumer implements Consumer { * @param record 数据记录。 * @throws HandlerException 处理器异常。 */ - protected abstract void doPush(R record) throws HandlerException; + protected abstract void doSuccessPush(R record) throws HandlerException; + + /** + * 推送无法成功推送的数据。 + * + * @param records 数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + protected abstract void doFailedPush(List records) throws HandlerException; + + /** + * 推送无法成功推送的数据。 + * + * @param record 数据记录。 + * @throws HandlerException 处理器异常。 + */ + protected abstract void doFailedPush(R record) throws HandlerException; @Override public String toString() { diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/NormalKeepConsumer.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/NormalKeepConsumer.java index b8dc43f603bff49c33b2de863531eb45557f2349..9c95f516b71ab7a7a923176dc7b9b449204a55d7 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/NormalKeepConsumer.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/NormalKeepConsumer.java @@ -21,17 +21,32 @@ public class NormalKeepConsumer extends KeepConsumer { @BehaviorAnalyse @Override - public void consume(@SkipRecord List records) throws HandlerException { - super.consume(records); + public void consume(@SkipRecord List normalDatas) throws HandlerException { + super.consume(normalDatas); } @Override - protected void doPush(List records) throws HandlerException { - pushHandler.normalUpdated(records); + public void consumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pushHandler.keepNormalConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); } @Override - protected void doPush(NormalData record) throws HandlerException { - pushHandler.normalUpdated(record); + protected void doSuccessPush(List normalDatas) throws HandlerException { + pushHandler.normalUpdated(normalDatas); + } + + @Override + protected void doSuccessPush(NormalData normalData) throws HandlerException { + pushHandler.normalUpdated(normalData); + } + + @Override + protected void doFailedPush(List normalDatas) throws HandlerException { + pushHandler.keepNormalFailed(normalDatas); + } + + @Override + protected void doFailedPush(NormalData normalData) throws HandlerException { + pushHandler.keepNormalFailed(normalData); } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/NormalPersistConsumer.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/NormalPersistConsumer.java index 95dc0143935f0ef8b4997f4dfa26067e0168ba0f..6f4942af01c0213392254ac2194968fbe89ca258 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/NormalPersistConsumer.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/NormalPersistConsumer.java @@ -17,17 +17,32 @@ public class NormalPersistConsumer extends PersistConsumer { @BehaviorAnalyse @Override - public void consume(@SkipRecord List records) throws HandlerException { - super.consume(records); + public void consume(@SkipRecord List normalDatas) throws HandlerException { + super.consume(normalDatas); } @Override - protected void doPush(List records) throws HandlerException { - pushHandler.normalRecorded(records); + public void consumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pushHandler.persistNormalConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); } @Override - protected void doPush(NormalData record) throws HandlerException { - pushHandler.normalRecorded(record); + protected void doSuccessPush(List normalDatas) throws HandlerException { + pushHandler.normalRecorded(normalDatas); + } + + @Override + protected void doSuccessPush(NormalData normalData) throws HandlerException { + pushHandler.normalRecorded(normalData); + } + + @Override + protected void doFailedPush(List normalDatas) throws HandlerException { + pushHandler.persistNormalFailed(normalDatas); + } + + @Override + protected void doFailedPush(NormalData normalData) throws HandlerException { + pushHandler.persistNormalFailed(normalData); } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/PersistConsumer.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/PersistConsumer.java index 233adfa1d1ed9a84561310b6af827284ea078abb..786a60226dffc61734a5d4836629372d6ed9ab7f 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/PersistConsumer.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/PersistConsumer.java @@ -87,7 +87,7 @@ public abstract class PersistConsumer implements Consumer { private void pushRecords(List records) { // 优先尝试批量推送数据,如果批量推送失败,则尝试逐条推送数据。 try { - doPush(records); + doSuccessPush(records); } catch (Exception e) { LOGGER.error("数据推送失败, 试图使用不同的策略进行推送: 逐条推送", e); } @@ -98,7 +98,7 @@ public abstract class PersistConsumer implements Consumer { // 遍历 records 中的所有数据记录,逐条推送数据。 for (D record : records) { try { - doPush(record); + doSuccessPush(record); } catch (Exception e) { LOGGER.error("数据推送失败, 放弃对该数据的推送: " + record, e); failedList.add(record); @@ -108,6 +108,11 @@ public abstract class PersistConsumer implements Consumer { // 如果有推送失败的数据记录,则记录日志。 if (!failedList.isEmpty()) { LOGGER.error("推送数据时发生异常, 最多 " + failedList.size() + " 个数据信息丢失"); + try { + doFailedPush(failedList); + } catch (HandlerException e) { + e.printStackTrace(); + } failedList.forEach(record -> LOGGER.debug(Objects.toString(record))); } } @@ -118,7 +123,7 @@ public abstract class PersistConsumer implements Consumer { * @param records 数据记录组成的列表。 * @throws HandlerException 处理器异常。 */ - protected abstract void doPush(List records) throws HandlerException; + protected abstract void doSuccessPush(List records) throws HandlerException; /** * 推送数据。 @@ -126,7 +131,23 @@ public abstract class PersistConsumer implements Consumer { * @param record 数据记录。 * @throws HandlerException 处理器异常。 */ - protected abstract void doPush(D record) throws HandlerException; + protected abstract void doSuccessPush(D record) throws HandlerException; + + /** + * 推送无法成功推送的数据。 + * + * @param records 数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + protected abstract void doFailedPush(List records) throws HandlerException; + + /** + * 推送无法成功推送的数据。 + * + * @param record 数据记录。 + * @throws HandlerException 处理器异常。 + */ + protected abstract void doFailedPush(D record) throws HandlerException; @Override public String toString() { diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/TriggeredKeepConsumer.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/TriggeredKeepConsumer.java index 48f711280b5efe31f079cfee71235e2f78ea39c0..f17dfa1f550d18ccf59ed4fecb4426a849d0a3c2 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/TriggeredKeepConsumer.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/TriggeredKeepConsumer.java @@ -21,17 +21,32 @@ public class TriggeredKeepConsumer extends KeepConsumer { @BehaviorAnalyse @Override - public void consume(@SkipRecord List records) throws HandlerException { - super.consume(records); + public void consume(@SkipRecord List triggeredDatas) throws HandlerException { + super.consume(triggeredDatas); } @Override - protected void doPush(List records) throws HandlerException { - pushHandler.triggeredUpdated(records); + public void consumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pushHandler.keepTriggeredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); } @Override - protected void doPush(TriggeredData record) throws HandlerException { - pushHandler.triggeredUpdated(record); + protected void doSuccessPush(List triggeredDatas) throws HandlerException { + pushHandler.triggeredUpdated(triggeredDatas); + } + + @Override + protected void doSuccessPush(TriggeredData triggeredData) throws HandlerException { + pushHandler.triggeredUpdated(triggeredData); + } + + @Override + protected void doFailedPush(List triggeredDatas) throws HandlerException { + pushHandler.keepTriggeredFailed(triggeredDatas); + } + + @Override + protected void doFailedPush(TriggeredData triggeredData) throws HandlerException { + pushHandler.keepTriggeredFailed(triggeredData); } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/TriggeredPersistConsumer.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/TriggeredPersistConsumer.java index 8540377e6d7ef7e8947d0b9fe8968dda146a3524..59ae177a86eb5a0d1d64fcc6c67f1b228eae1282 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/TriggeredPersistConsumer.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/consumer/TriggeredPersistConsumer.java @@ -17,17 +17,32 @@ public class TriggeredPersistConsumer extends PersistConsumer { @BehaviorAnalyse @Override - public void consume(@SkipRecord List records) throws HandlerException { - super.consume(records); + public void consume(@SkipRecord List triggeredDatas) throws HandlerException { + super.consume(triggeredDatas); } @Override - protected void doPush(List records) throws HandlerException { - pushHandler.triggeredRecorded(records); + public void consumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + pushHandler.persistTriggeredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); } @Override - protected void doPush(TriggeredData record) throws HandlerException { - pushHandler.triggeredRecorded(record); + protected void doSuccessPush(List triggeredDatas) throws HandlerException { + pushHandler.triggeredRecorded(triggeredDatas); + } + + @Override + protected void doSuccessPush(TriggeredData triggeredData) throws HandlerException { + pushHandler.triggeredRecorded(triggeredData); + } + + @Override + protected void doFailedPush(List triggeredDatas) throws HandlerException { + pushHandler.persistTriggeredFailed(triggeredDatas); + } + + @Override + protected void doFailedPush(TriggeredData triggeredData) throws HandlerException { + pushHandler.persistTriggeredFailed(triggeredData); } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/DrainPusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/DrainPusher.java index 5e1e75f6d146ee90c5708db0a40a1ed9e5a96208..b48531a3c755532bfcacbe993484b86ef1f1ebb8 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/DrainPusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/DrainPusher.java @@ -2,6 +2,7 @@ package com.dwarfeng.fdr.impl.handler.pusher; import com.dwarfeng.fdr.stack.bean.dto.FilteredData; import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import com.dwarfeng.fdr.stack.bean.dto.RecordInfo; import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; import org.springframework.stereotype.Component; @@ -78,10 +79,88 @@ public class DrainPusher extends AbstractPusher { public void mapReset() { } + @Override + public void recordFailed(RecordInfo recordInfo) { + } + + @Override + public void keepNormalFailed(NormalData normalData) { + } + + @Override + public void keepNormalFailed(List normalDatas) { + } + + @Override + public void persistNormalFailed(NormalData normalData) { + } + + @Override + public void persistNormalFailed(List normalDatas) { + } + + @Override + public void keepFilteredFailed(FilteredData filteredData) { + } + + @Override + public void keepFilteredFailed(List filteredDatas) { + } + + @Override + public void persistFilteredFailed(FilteredData filteredData) { + } + + @Override + public void persistFilteredFailed(List filteredDatas) { + } + + @Override + public void keepTriggeredFailed(TriggeredData triggeredData) { + } + + @Override + public void keepTriggeredFailed(List triggeredDatas) { + } + + @Override + public void persistTriggeredFailed(TriggeredData triggeredData) { + } + + @Override + public void persistTriggeredFailed(List triggeredDatas) { + } + + @Override + public void recordBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + } + + @Override + public void keepNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + } + + @Override + public void keepFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + } + + @Override + public void keepTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + } + + @Override + public void persistNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + } + + @Override + public void persistFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + } + + @Override + public void persistTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + } + @Override public String toString() { - return "DrainPusher{" + - "pusherType='" + pusherType + '\'' + - '}'; + return "DrainPusher{" + "pusherType='" + pusherType + '\'' + '}'; } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/LogPusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/LogPusher.java index 0da976d325223765533982dd8f9235a6595b2c47..62199921859abe2d6d83d1da2019ebe91ddf54be 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/LogPusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/LogPusher.java @@ -6,6 +6,7 @@ import com.dwarfeng.fdr.sdk.bean.dto.FastJsonNormalData; import com.dwarfeng.fdr.sdk.bean.dto.FastJsonTriggeredData; import com.dwarfeng.fdr.stack.bean.dto.FilteredData; import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import com.dwarfeng.fdr.stack.bean.dto.RecordInfo; import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; import com.dwarfeng.subgrade.stack.exception.HandlerException; import org.apache.commons.lang3.StringUtils; @@ -140,6 +141,146 @@ public class LogPusher extends AbstractPusher { logData(title, message); } + @Override + public void recordFailed(RecordInfo recordInfo) throws HandlerException { + String title = "记录失败:"; + String message = recordInfo.toString(); + logData(title, message); + } + + @Override + public void keepNormalFailed(NormalData normalData) throws HandlerException { + String title = "无法更新的一般数据:"; + String message = normalData.toString(); + logData(title, message); + } + + @Override + public void keepNormalFailed(List normalDatas) throws HandlerException { + for (NormalData normalData : normalDatas) { + keepNormalFailed(normalData); + } + } + + @Override + public void persistNormalFailed(NormalData normalData) throws HandlerException { + String title = "无法记录的一般数据:"; + String message = normalData.toString(); + logData(title, message); + } + + @Override + public void persistNormalFailed(List normalDatas) throws HandlerException { + for (NormalData normalData : normalDatas) { + persistNormalFailed(normalData); + } + } + + @Override + public void keepFilteredFailed(FilteredData filteredData) throws HandlerException { + String title = "无法更新的被过滤数据:"; + String message = filteredData.toString(); + logData(title, message); + } + + @Override + public void keepFilteredFailed(List filteredDatas) throws HandlerException { + for (FilteredData filteredData : filteredDatas) { + keepFilteredFailed(filteredData); + } + } + + @Override + public void persistFilteredFailed(FilteredData filteredData) throws HandlerException { + String title = "无法记录的被过滤数据:"; + String message = filteredData.toString(); + logData(title, message); + } + + @Override + public void persistFilteredFailed(List filteredDatas) throws HandlerException { + for (FilteredData filteredData : filteredDatas) { + persistFilteredFailed(filteredData); + } + } + + @Override + public void keepTriggeredFailed(TriggeredData triggeredData) throws HandlerException { + String title = "无法更新的被触发数据:"; + String message = triggeredData.toString(); + logData(title, message); + } + + @Override + public void keepTriggeredFailed(List triggeredDatas) throws HandlerException { + for (TriggeredData triggeredData : triggeredDatas) { + keepTriggeredFailed(triggeredData); + } + } + + @Override + public void persistTriggeredFailed(TriggeredData triggeredData) throws HandlerException { + String title = "无法记录的被触发数据:"; + String message = triggeredData.toString(); + logData(title, message); + } + + @Override + public void persistTriggeredFailed(List triggeredDatas) throws HandlerException { + for (TriggeredData triggeredData : triggeredDatas) { + persistTriggeredFailed(triggeredData); + } + } + + @Override + public void recordBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + String title = "逻辑侧处理器缓冲数据积压:"; + String message = "逻辑侧的待消费元素数量为 " + bufferSize + ",缓存元素数量为 " + allBufferSize + ",比例为 " + ratio + ",超过报警值 " + warnThreshold + ",请检查"; + logData(title, message); + } + + @Override + public void keepNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + String title = "记录侧处理器缓冲数据积压(一般数据更新):"; + String message = "记录侧的待消费元素数量为 " + bufferSize + ",缓存元素数量为 " + allBufferSize + ",比例为 " + ratio + ",超过报警值 " + warnThreshold + ",请检查"; + logData(title, message); + } + + @Override + public void keepFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + String title = "记录侧处理器缓冲数据积压(被过滤数据更新):"; + String message = "记录侧的待消费元素数量为 " + bufferSize + ",缓存元素数量为 " + allBufferSize + ",比例为 " + ratio + ",超过报警值 " + warnThreshold + ",请检查"; + logData(title, message); + } + + @Override + public void keepTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + String title = "记录侧处理器缓冲数据积压(被触发数据更新):"; + String message = "记录侧的待消费元素数量为 " + bufferSize + ",缓存元素数量为 " + allBufferSize + ",比例为 " + ratio + ",超过报警值 " + warnThreshold + ",请检查"; + logData(title, message); + } + + @Override + public void persistNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + String title = "记录侧处理器缓冲数据积压(一般数据记录):"; + String message = "记录侧的待消费元素数量为 " + bufferSize + ",缓存元素数量为 " + allBufferSize + ",比例为 " + ratio + ",超过报警值 " + warnThreshold + ",请检查"; + logData(title, message); + } + + @Override + public void persistFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + String title = "记录侧处理器缓冲数据积压(被过滤数据记录):"; + String message = "记录侧的待消费元素数量为 " + bufferSize + ",缓存元素数量为 " + allBufferSize + ",比例为 " + ratio + ",超过报警值 " + warnThreshold + ",请检查"; + logData(title, message); + } + + @Override + public void persistTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException { + String title = "记录侧处理器缓冲数据积压(被触发数据记录):"; + String message = "记录侧的待消费元素数量为 " + bufferSize + ",缓存元素数量为 " + allBufferSize + ",比例为 " + ratio + ",超过报警值 " + warnThreshold + ",请检查"; + logData(title, message); + } + private void logData(String title, String message) throws HandlerException { String logLevel = this.logLevel.toUpperCase(); switch (logLevel) { diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/MultiPusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/MultiPusher.java index 17016a7f94e03f558fc4215ae4e5912987011b85..52b2461aba962672932049bda3a6a71609c6c9fc 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/MultiPusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/MultiPusher.java @@ -3,6 +3,7 @@ package com.dwarfeng.fdr.impl.handler.pusher; import com.dwarfeng.fdr.impl.handler.Pusher; import com.dwarfeng.fdr.stack.bean.dto.FilteredData; import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import com.dwarfeng.fdr.stack.bean.dto.RecordInfo; import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; import com.dwarfeng.subgrade.stack.exception.HandlerException; import org.slf4j.Logger; @@ -42,8 +43,7 @@ public class MultiPusher extends AbstractPusher { StringTokenizer st = new StringTokenizer(delegateTypes, ","); while (st.hasMoreTokens()) { String delegateType = st.nextToken(); - delegates.add(pushers.stream().filter(p -> p.supportType(delegateType)).findAny() - .orElseThrow(() -> new HandlerException("未知的 pusher 类型: " + delegateType))); + delegates.add(pushers.stream().filter(p -> p.supportType(delegateType)).findAny().orElseThrow(() -> new HandlerException("未知的 pusher 类型: " + delegateType))); } } @@ -201,11 +201,228 @@ public class MultiPusher extends AbstractPusher { } } + @Override + public void recordFailed(RecordInfo recordInfo) { + for (Pusher delegate : delegates) { + try { + delegate.recordFailed(recordInfo); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepNormalFailed(NormalData normalData) { + for (Pusher delegate : delegates) { + try { + delegate.keepNormalFailed(normalData); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepNormalFailed(List normalDatas) { + for (Pusher delegate : delegates) { + try { + delegate.keepNormalFailed(normalDatas); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistNormalFailed(NormalData normalData) { + for (Pusher delegate : delegates) { + try { + delegate.persistNormalFailed(normalData); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistNormalFailed(List normalDatas) { + for (Pusher delegate : delegates) { + try { + delegate.persistNormalFailed(normalDatas); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepFilteredFailed(FilteredData filteredData) { + for (Pusher delegate : delegates) { + try { + delegate.keepFilteredFailed(filteredData); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepFilteredFailed(List filteredDatas) { + for (Pusher delegate : delegates) { + try { + delegate.keepFilteredFailed(filteredDatas); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistFilteredFailed(FilteredData filteredData) { + for (Pusher delegate : delegates) { + try { + delegate.persistFilteredFailed(filteredData); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistFilteredFailed(List filteredDatas) { + for (Pusher delegate : delegates) { + try { + delegate.persistFilteredFailed(filteredDatas); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepTriggeredFailed(TriggeredData triggeredData) { + for (Pusher delegate : delegates) { + try { + delegate.keepTriggeredFailed(triggeredData); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepTriggeredFailed(List triggeredDatas) { + for (Pusher delegate : delegates) { + try { + delegate.keepTriggeredFailed(triggeredDatas); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistTriggeredFailed(TriggeredData triggeredData) { + for (Pusher delegate : delegates) { + try { + delegate.persistTriggeredFailed(triggeredData); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistTriggeredFailed(List triggeredDatas) { + for (Pusher delegate : delegates) { + try { + delegate.persistTriggeredFailed(triggeredDatas); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void recordBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + for (Pusher delegate : delegates) { + try { + delegate.recordBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + for (Pusher delegate : delegates) { + try { + delegate.keepNormalConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + for (Pusher delegate : delegates) { + try { + delegate.keepFilteredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void keepTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + for (Pusher delegate : delegates) { + try { + delegate.keepTriggeredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + for (Pusher delegate : delegates) { + try { + delegate.persistNormalConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + for (Pusher delegate : delegates) { + try { + delegate.persistFilteredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + + @Override + public void persistTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + for (Pusher delegate : delegates) { + try { + delegate.persistTriggeredConsumeBufferOverstock(bufferSize, allBufferSize, ratio, warnThreshold); + } catch (Exception e) { + LOGGER.warn("代理推送器推送数据失败,异常信息如下: ", e); + } + } + } + @Override public String toString() { - return "MultiPusher{" + - "delegateTypes='" + delegateTypes + '\'' + - ", pusherType='" + pusherType + '\'' + - '}'; + return "MultiPusher{" + "delegateTypes='" + delegateTypes + '\'' + ", pusherType='" + pusherType + '\'' + '}'; } } diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/NativeKafkaPusher.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/NativeKafkaPusher.java index 68ab6d7444dc6d971c9170841b9d27952e4bbe23..1f5fdebcf63232effb6310a206fbedd992d7748a 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/NativeKafkaPusher.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/pusher/NativeKafkaPusher.java @@ -4,9 +4,11 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.dwarfeng.fdr.sdk.bean.dto.FastJsonFilteredData; import com.dwarfeng.fdr.sdk.bean.dto.FastJsonNormalData; +import com.dwarfeng.fdr.sdk.bean.dto.FastJsonRecordInfo; import com.dwarfeng.fdr.sdk.bean.dto.FastJsonTriggeredData; import com.dwarfeng.fdr.stack.bean.dto.FilteredData; import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import com.dwarfeng.fdr.stack.bean.dto.RecordInfo; import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.ProducerConfig; @@ -37,26 +39,55 @@ import java.util.Map; @Component public class NativeKafkaPusher extends AbstractPusher { - public static final String PUSHER_TYPE = "native.kafka"; + public static final String PUSHER_TYPE = "kafka.native"; private final KafkaTemplate kafkaTemplate; - @Value("${pusher.native.kafka.topic.normal_updated}") + @Value("${pusher.kafka.native.topic.normal_updated}") private String normalUpdatedTopic; - @Value("${pusher.native.kafka.topic.normal_recorded}") + @Value("${pusher.kafka.native.topic.normal_recorded}") private String normalRecordedTopic; - @Value("${pusher.native.kafka.topic.filtered_updated}") + @Value("${pusher.kafka.native.topic.filtered_updated}") private String filteredUpdatedTopic; - @Value("${pusher.native.kafka.topic.filtered_recorded}") + @Value("${pusher.kafka.native.topic.filtered_recorded}") private String filteredRecordedTopic; - @Value("${pusher.native.kafka.topic.triggered_updated}") + @Value("${pusher.kafka.native.topic.triggered_updated}") private String triggeredUpdatedTopic; - @Value("${pusher.native.kafka.topic.triggered_recorded}") + @Value("${pusher.kafka.native.topic.triggered_recorded}") private String triggeredRecordedTopic; - @Value("${pusher.native.kafka.topic.record_reset}") + @Value("${pusher.kafka.native.topic.record_reset}") private String recordResetTopic; - @Value("${pusher.native.kafka.topic.map_reset}") + @Value("${pusher.kafka.native.topic.map_reset}") private String mapResetTopic; + @Value("${pusher.kafka.native.topic.record_failed}") + private String recordFailedTopic; + @Value("${pusher.kafka.native.topic.keep_normal_failed}") + private String keepNormalFailedTopic; + @Value("${pusher.kafka.native.topic.persist_normal_failed}") + private String persistNormalFailedTopic; + @Value("${pusher.kafka.native.topic.keep_filtered_Failed}") + private String keepFilteredFailedTopic; + @Value("${pusher.kafka.native.topic.persist_filtered_Failed}") + private String persistFilteredFailedTopic; + @Value("${pusher.kafka.native.topic.keep_triggered_Failed}") + private String keepTriggeredFailedTopic; + @Value("${pusher.kafka.native.topic.persist_triggered_Failed}") + private String persistTriggeredFailedTopic; + @Value("${pusher.kafka.native.topic.record_buffer_overstock}") + private String recordBufferOverstockTopic; + @Value("${pusher.kafka.native.topic.keepNormal_consume_buffer_overstock}") + private String keepNormalConsumeBufferOverstockTopic; + @Value("${pusher.kafka.native.topic.keepFiltered_consume_buffer_overstock}") + private String keepFilteredConsumeBufferOverstockTopic; + @Value("${pusher.kafka.native.topic.keepTriggered_consume_buffer_overstock}") + private String keepTriggeredConsumeBufferOverstockTopic; + @Value("${pusher.kafka.native.topic.persistNormal_consume_buffer_overstock}") + private String persistNormalConsumeBufferOverstockTopic; + @Value("${pusher.kafka.native.topic.persistFiltered_consume_buffer_overstock}") + private String persistFilteredConsumeBufferOverstockTopic; + @Value("${pusher.kafka.native.topic.persistTriggered_consume_buffer_overstock}") + private String persistTriggeredConsumeBufferOverstockTopic; + public NativeKafkaPusher( @Qualifier("nativeKafkaPusher.kafkaTemplate") KafkaTemplate kafkaTemplate @@ -114,7 +145,7 @@ public class NativeKafkaPusher extends AbstractPusher { @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") @Override public void filteredRecorded(List filteredRecords) { - + filteredRecords.forEach(this::filteredRecorded); } @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") @@ -155,6 +186,149 @@ public class NativeKafkaPusher extends AbstractPusher { kafkaTemplate.send(mapResetTopic, StringUtils.EMPTY); } + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void recordFailed(RecordInfo recordInfo) { + String message = JSON.toJSONString(FastJsonRecordInfo.of(recordInfo), SerializerFeature.WriteClassName); + kafkaTemplate.send(recordFailedTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepNormalFailed(NormalData normalData) { + String message = JSON.toJSONString(FastJsonNormalData.of(normalData), SerializerFeature.WriteClassName); + kafkaTemplate.send(keepNormalFailedTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepNormalFailed(List normalDatas) { + normalDatas.forEach(this::keepNormalFailed); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistNormalFailed(NormalData normalData) { + String message = JSON.toJSONString(FastJsonNormalData.of(normalData), SerializerFeature.WriteClassName); + kafkaTemplate.send(persistNormalFailedTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistNormalFailed(List normalDatas) { + normalDatas.forEach(this::persistNormalFailed); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepFilteredFailed(FilteredData filteredData) { + String message = JSON.toJSONString(FastJsonFilteredData.of(filteredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(keepFilteredFailedTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepFilteredFailed(List filteredDatas) { + filteredDatas.forEach(this::keepFilteredFailed); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistFilteredFailed(FilteredData filteredData) { + String message = JSON.toJSONString(FastJsonFilteredData.of(filteredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(persistFilteredFailedTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistFilteredFailed(List filteredDatas) { + filteredDatas.forEach(this::persistFilteredFailed); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepTriggeredFailed(TriggeredData triggeredData) { + String message = JSON.toJSONString(FastJsonTriggeredData.of(triggeredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(keepTriggeredFailedTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepTriggeredFailed(List triggeredDatas) { + triggeredDatas.forEach(this::keepTriggeredFailed); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistTriggeredFailed(TriggeredData triggeredData) { + String message = JSON.toJSONString(FastJsonTriggeredData.of(triggeredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(persistTriggeredFailedTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistTriggeredFailed(List triggeredDatas) { + triggeredDatas.forEach(this::persistTriggeredFailed); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void recordBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + String message = JSON.toJSONString(getOverstockData(bufferSize, allBufferSize, ratio, warnThreshold), SerializerFeature.WriteClassName); + kafkaTemplate.send(recordBufferOverstockTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + String message = JSON.toJSONString(getOverstockData(bufferSize, allBufferSize, ratio, warnThreshold), SerializerFeature.WriteClassName); + kafkaTemplate.send(keepNormalConsumeBufferOverstockTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + String message = JSON.toJSONString(getOverstockData(bufferSize, allBufferSize, ratio, warnThreshold), SerializerFeature.WriteClassName); + kafkaTemplate.send(keepFilteredConsumeBufferOverstockTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void keepTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + String message = JSON.toJSONString(getOverstockData(bufferSize, allBufferSize, ratio, warnThreshold), SerializerFeature.WriteClassName); + kafkaTemplate.send(keepTriggeredConsumeBufferOverstockTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + String message = JSON.toJSONString(getOverstockData(bufferSize, allBufferSize, ratio, warnThreshold), SerializerFeature.WriteClassName); + kafkaTemplate.send(persistNormalConsumeBufferOverstockTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + String message = JSON.toJSONString(getOverstockData(bufferSize, allBufferSize, ratio, warnThreshold), SerializerFeature.WriteClassName); + kafkaTemplate.send(persistFilteredConsumeBufferOverstockTopic, message); + } + + @Transactional(transactionManager = "nativeKafkaPusher.kafkaTransactionManager") + @Override + public void persistTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + String message = JSON.toJSONString(getOverstockData(bufferSize, allBufferSize, ratio, warnThreshold), SerializerFeature.WriteClassName); + kafkaTemplate.send(persistTriggeredConsumeBufferOverstockTopic, message); + } + + private HashMap getOverstockData(double bufferSize, double allBufferSize, double ratio, double warnThreshold) { + HashMap map = new HashMap<>(); + map.put("bufferSize", bufferSize); + map.put("allBufferSize", allBufferSize); + map.put("ratio", ratio); + map.put("warnThreshold", warnThreshold); + return map; + } + @Override public String toString() { return "NativeKafkaPusher{" + @@ -167,6 +341,20 @@ public class NativeKafkaPusher extends AbstractPusher { ", triggeredRecordedTopic='" + triggeredRecordedTopic + '\'' + ", recordResetTopic='" + recordResetTopic + '\'' + ", mapResetTopic='" + mapResetTopic + '\'' + + ", recordFailedTopic='" + recordFailedTopic + '\'' + + ", keepNormalFailedTopic='" + keepNormalFailedTopic + '\'' + + ", persistNormalFailedTopic='" + persistNormalFailedTopic + '\'' + + ", keepFilteredFailedTopic='" + keepFilteredFailedTopic + '\'' + + ", persistFilteredFailedTopic='" + persistFilteredFailedTopic + '\'' + + ", keepTriggeredFailedTopic='" + keepTriggeredFailedTopic + '\'' + + ", persistTriggeredFailedTopic='" + persistTriggeredFailedTopic + '\'' + + ", recordBufferOverstockTopic='" + recordBufferOverstockTopic + '\'' + + ", keepNormalConsumeBufferOverstockTopic='" + keepNormalConsumeBufferOverstockTopic + '\'' + + ", keepFilteredConsumeBufferOverstockTopic='" + keepFilteredConsumeBufferOverstockTopic + '\'' + + ", keepTriggeredConsumeBufferOverstockTopic='" + keepTriggeredConsumeBufferOverstockTopic + '\'' + + ", persistNormalConsumeBufferOverstockTopic='" + persistNormalConsumeBufferOverstockTopic + '\'' + + ", persistFilteredConsumeBufferOverstockTopic='" + persistFilteredConsumeBufferOverstockTopic + '\'' + + ", persistTriggeredConsumeBufferOverstockTopic='" + persistTriggeredConsumeBufferOverstockTopic + '\'' + '}'; } @@ -175,19 +363,19 @@ public class NativeKafkaPusher extends AbstractPusher { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPusherConfiguration.class); - @Value("${pusher.native.kafka.bootstrap_servers}") + @Value("${pusher.kafka.native.bootstrap_servers}") private String producerBootstrapServers; - @Value("${pusher.native.kafka.retries}") + @Value("${pusher.kafka.native.retries}") private int retries; - @Value("${pusher.native.kafka.linger}") + @Value("${pusher.kafka.native.linger}") private long linger; - @Value("${pusher.native.kafka.buffer_memory}") + @Value("${pusher.kafka.native.buffer_memory}") private long bufferMemory; - @Value("${pusher.native.kafka.batch_size}") + @Value("${pusher.kafka.native.batch_size}") private int batchSize; - @Value("${pusher.native.kafka.acks}") + @Value("${pusher.kafka.native.acks}") private String acks; - @Value("${pusher.native.kafka.transaction_prefix}") + @Value("${pusher.kafka.native.transaction_prefix}") private String transactionPrefix; @Bean("nativeKafkaPusher.producerProperties") diff --git a/fdr-impl/src/test/resources/fdr/push.properties b/fdr-impl/src/test/resources/fdr/push.properties index e8a10b7bc1de7e63ea5076ede7d980f02389c048..26c738481243bcff087050802fcf6e7d98274890 100644 --- a/fdr-impl/src/test/resources/fdr/push.properties +++ b/fdr-impl/src/test/resources/fdr/push.properties @@ -5,7 +5,7 @@ # \u76EE\u524D\u8BE5\u9879\u76EE\u652F\u6301\u7684\u63A8\u9001\u5668\u7C7B\u578B\u6709: # drain: \u7B80\u5355\u7684\u4E22\u5F03\u6389\u6240\u6709\u6D88\u606F\u7684\u63A8\u9001\u5668\u3002 # multi: \u540C\u65F6\u5C06\u6D88\u606F\u63A8\u9001\u7ED9\u6240\u6709\u4EE3\u7406\u7684\u591A\u91CD\u63A8\u9001\u5668\u3002 -# native.kafka: \u4F7F\u7528\u539F\u751F\u6570\u636E\u7684\u57FA\u4E8EKafka\u6D88\u606F\u961F\u5217\u7684\u63A8\u9001\u5668\u3002 +# kafka.native: \u4F7F\u7528\u539F\u751F\u6570\u636E\u7684\u57FA\u4E8EKafka\u6D88\u606F\u961F\u5217\u7684\u63A8\u9001\u5668\u3002 # log: \u5C06\u6D88\u606F\u8F93\u51FA\u5230\u65E5\u5FD7\u4E2D\u7684\u63A8\u9001\u5668\u3002 # # \u5BF9\u4E8E\u4E00\u4E2A\u5177\u4F53\u7684\u9879\u76EE\uFF0C\u5F88\u53EF\u80FD\u53EA\u7528\u4E00\u4E2A\u63A8\u9001\u5668\u3002\u6B64\u65F6\u5982\u679C\u5E0C\u671B\u7A0B\u5E8F\u52A0\u8F7D\u65F6\u53EA\u52A0\u8F7D\u4E00\u4E2A\u63A8\u9001\u5668\uFF0C\u53EF\u4EE5\u901A\u8FC7\u7F16\u8F91 @@ -22,40 +22,40 @@ pusher.type=drain # multi # ################################################### # \u4EE3\u7406\u7684\u63A8\u9001\u5668\uFF0C\u63A8\u9001\u5668\u4E4B\u95F4\u4EE5\u9017\u53F7\u5206\u9694\u3002 -pusher.multi.delegate_types=native.kafka +pusher.multi.delegate_types=kafka.native # ################################################### -# native.kafka # +# kafka.native # ################################################### # broker\u96C6\u7FA4\u3002 -pusher.native.kafka.bootstrap_servers=your ip here like ip1:9092,ip2:9092,ip3:9092 +pusher.kafka.native.bootstrap_servers=your ip here like ip1:9092,ip2:9092,ip3:9092 # \u8FDE\u63A5\u5C5E\u6027\u3002 -pusher.native.kafka.acks=all +pusher.kafka.native.acks=all # \u53D1\u9001\u5931\u8D25\u91CD\u8BD5\u6B21\u6570\u3002 -pusher.native.kafka.retries=3 -pusher.native.kafka.linger=10 +pusher.kafka.native.retries=3 +pusher.kafka.native.linger=10 # \u7684\u6279\u5904\u7406\u7F13\u51B2\u533A\u5927\u5C0F\u3002 -pusher.native.kafka.buffer_memory=40960 +pusher.kafka.native.buffer_memory=40960 # \u6279\u5904\u7406\u6761\u6570\uFF1A\u5F53\u591A\u4E2A\u8BB0\u5F55\u88AB\u53D1\u9001\u5230\u540C\u4E00\u4E2A\u5206\u533A\u65F6\uFF0C\u751F\u4EA7\u8005\u4F1A\u5C1D\u8BD5\u5C06\u8BB0\u5F55\u5408\u5E76\u5230\u66F4\u5C11\u7684\u8BF7\u6C42\u4E2D\u3002\u8FD9\u6709\u52A9\u4E8E\u5BA2\u6237\u7AEF\u548C\u670D\u52A1\u5668\u7684\u6027\u80FD\u3002 -pusher.native.kafka.batch_size=4096 +pusher.kafka.native.batch_size=4096 # Kafka\u4E8B\u52A1\u7684\u524D\u7F00\u3002 -pusher.native.kafka.transaction_prefix=fdr.pusher. +pusher.kafka.native.transaction_prefix=fdr.pusher. # \u4E00\u822C\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.normal_updated=fdr.pusher.normal_updated +pusher.kafka.native.topic.normal_updated=fdr.pusher.normal_updated # \u4E00\u822C\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.normal_recorded=fdr.pusher.normal_recorded +pusher.kafka.native.topic.normal_recorded=fdr.pusher.normal_recorded # \u88AB\u8FC7\u6EE4\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.filtered_updated=fdr.pusher.filtered_updated +pusher.kafka.native.topic.filtered_updated=fdr.pusher.filtered_updated # \u88AB\u8FC7\u6EE4\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.filtered_recorded=fdr.pusher.filtered_recorded +pusher.kafka.native.topic.filtered_recorded=fdr.pusher.filtered_recorded # \u88AB\u89E6\u53D1\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.triggered_updated=fdr.pusher.triggered_updated +pusher.kafka.native.topic.triggered_updated=fdr.pusher.triggered_updated # \u88AB\u89E6\u53D1\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.triggered_recorded=fdr.pusher.triggered_recorded +pusher.kafka.native.topic.triggered_recorded=fdr.pusher.triggered_recorded # \u8BB0\u5F55\u529F\u80FD\u91CD\u7F6E\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.record_reset=fdr.pusher.record_reset +pusher.kafka.native.topic.record_reset=fdr.pusher.record_reset # \u6620\u5C04\u529F\u80FD\u91CD\u7F6E\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.map_reset=fdr.pusher.map_reset +pusher.kafka.native.topic.map_reset=fdr.pusher.map_reset # ################################################### # log # diff --git a/fdr-node/src/main/resources/fdr/push.properties b/fdr-node/src/main/resources/fdr/push.properties index e8a10b7bc1de7e63ea5076ede7d980f02389c048..5293be06d09affdb9af848a266aa7446dc390e27 100644 --- a/fdr-node/src/main/resources/fdr/push.properties +++ b/fdr-node/src/main/resources/fdr/push.properties @@ -5,7 +5,7 @@ # \u76EE\u524D\u8BE5\u9879\u76EE\u652F\u6301\u7684\u63A8\u9001\u5668\u7C7B\u578B\u6709: # drain: \u7B80\u5355\u7684\u4E22\u5F03\u6389\u6240\u6709\u6D88\u606F\u7684\u63A8\u9001\u5668\u3002 # multi: \u540C\u65F6\u5C06\u6D88\u606F\u63A8\u9001\u7ED9\u6240\u6709\u4EE3\u7406\u7684\u591A\u91CD\u63A8\u9001\u5668\u3002 -# native.kafka: \u4F7F\u7528\u539F\u751F\u6570\u636E\u7684\u57FA\u4E8EKafka\u6D88\u606F\u961F\u5217\u7684\u63A8\u9001\u5668\u3002 +# kafka.native: \u4F7F\u7528\u539F\u751F\u6570\u636E\u7684\u57FA\u4E8EKafka\u6D88\u606F\u961F\u5217\u7684\u63A8\u9001\u5668\u3002 # log: \u5C06\u6D88\u606F\u8F93\u51FA\u5230\u65E5\u5FD7\u4E2D\u7684\u63A8\u9001\u5668\u3002 # # \u5BF9\u4E8E\u4E00\u4E2A\u5177\u4F53\u7684\u9879\u76EE\uFF0C\u5F88\u53EF\u80FD\u53EA\u7528\u4E00\u4E2A\u63A8\u9001\u5668\u3002\u6B64\u65F6\u5982\u679C\u5E0C\u671B\u7A0B\u5E8F\u52A0\u8F7D\u65F6\u53EA\u52A0\u8F7D\u4E00\u4E2A\u63A8\u9001\u5668\uFF0C\u53EF\u4EE5\u901A\u8FC7\u7F16\u8F91 @@ -22,40 +22,68 @@ pusher.type=drain # multi # ################################################### # \u4EE3\u7406\u7684\u63A8\u9001\u5668\uFF0C\u63A8\u9001\u5668\u4E4B\u95F4\u4EE5\u9017\u53F7\u5206\u9694\u3002 -pusher.multi.delegate_types=native.kafka +pusher.multi.delegate_types=kafka.native # ################################################### -# native.kafka # +# kafka.native # ################################################### # broker\u96C6\u7FA4\u3002 -pusher.native.kafka.bootstrap_servers=your ip here like ip1:9092,ip2:9092,ip3:9092 +pusher.kafka.native.bootstrap_servers=your ip here like ip1:9092,ip2:9092,ip3:9092 # \u8FDE\u63A5\u5C5E\u6027\u3002 -pusher.native.kafka.acks=all +pusher.kafka.native.acks=all # \u53D1\u9001\u5931\u8D25\u91CD\u8BD5\u6B21\u6570\u3002 -pusher.native.kafka.retries=3 -pusher.native.kafka.linger=10 +pusher.kafka.native.retries=3 +pusher.kafka.native.linger=10 # \u7684\u6279\u5904\u7406\u7F13\u51B2\u533A\u5927\u5C0F\u3002 -pusher.native.kafka.buffer_memory=40960 +pusher.kafka.native.buffer_memory=40960 # \u6279\u5904\u7406\u6761\u6570\uFF1A\u5F53\u591A\u4E2A\u8BB0\u5F55\u88AB\u53D1\u9001\u5230\u540C\u4E00\u4E2A\u5206\u533A\u65F6\uFF0C\u751F\u4EA7\u8005\u4F1A\u5C1D\u8BD5\u5C06\u8BB0\u5F55\u5408\u5E76\u5230\u66F4\u5C11\u7684\u8BF7\u6C42\u4E2D\u3002\u8FD9\u6709\u52A9\u4E8E\u5BA2\u6237\u7AEF\u548C\u670D\u52A1\u5668\u7684\u6027\u80FD\u3002 -pusher.native.kafka.batch_size=4096 +pusher.kafka.native.batch_size=4096 # Kafka\u4E8B\u52A1\u7684\u524D\u7F00\u3002 -pusher.native.kafka.transaction_prefix=fdr.pusher. +pusher.kafka.native.transaction_prefix=fdr.pusher. # \u4E00\u822C\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.normal_updated=fdr.pusher.normal_updated +pusher.kafka.native.topic.normal_updated=fdr.pusher.normal_updated # \u4E00\u822C\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.normal_recorded=fdr.pusher.normal_recorded +pusher.kafka.native.topic.normal_recorded=fdr.pusher.normal_recorded # \u88AB\u8FC7\u6EE4\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.filtered_updated=fdr.pusher.filtered_updated +pusher.kafka.native.topic.filtered_updated=fdr.pusher.filtered_updated # \u88AB\u8FC7\u6EE4\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.filtered_recorded=fdr.pusher.filtered_recorded +pusher.kafka.native.topic.filtered_recorded=fdr.pusher.filtered_recorded # \u88AB\u89E6\u53D1\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.triggered_updated=fdr.pusher.triggered_updated +pusher.kafka.native.topic.triggered_updated=fdr.pusher.triggered_updated # \u88AB\u89E6\u53D1\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.triggered_recorded=fdr.pusher.triggered_recorded +pusher.kafka.native.topic.triggered_recorded=fdr.pusher.triggered_recorded # \u8BB0\u5F55\u529F\u80FD\u91CD\u7F6E\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.record_reset=fdr.pusher.record_reset +pusher.kafka.native.topic.record_reset=fdr.pusher.record_reset # \u6620\u5C04\u529F\u80FD\u91CD\u7F6E\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 -pusher.native.kafka.topic.map_reset=fdr.pusher.map_reset +pusher.kafka.native.topic.map_reset=fdr.pusher.map_reset +# \u8BB0\u5F55\u5931\u8D25\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.kafka.native.topic.record_Failed=fdr.pusher.record_Failed +# \u4E00\u822C\u6570\u636E\u66F4\u65B0\u5931\u8D25\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.kafka.native.topic.keep_normal_Failed=fdr.pusher.keep_normal_Failed +# \u4E00\u822C\u6570\u636E\u8BB0\u5F55\u5931\u8D25\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.kafka.native.topic.persist_normal_Failed=fdr.pusher.persist_normal_Failed +# \u88AB\u8FC7\u6EE4\u6570\u636E\u66F4\u65B0\u5931\u8D25\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.kafka.native.topic.keep_filtered_Failed=fdr.pusher.keep_filtered_Failed +# \u88AB\u8FC7\u6EE4\u6570\u636E\u8BB0\u5F55\u5931\u8D25\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.kafka.native.topic.persist_filtered_Failed=fdr.pusher.persist_filtered_Failed +# \u88AB\u89E6\u53D1\u6570\u636E\u66F4\u65B0\u5931\u8D25\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.kafka.native.topic.keep_triggered_Failed=fdr.pusher.keep_triggered_Failed +# \u88AB\u89E6\u53D1\u6570\u636E\u8BB0\u5F55\u5931\u8D25\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.kafka.native.topic.persist_triggered_Failed=fdr.pusher.persist_triggered_Failed +# \u903B\u8F91\u4FA7\u5904\u7406\u5668\u7F13\u51B2\u6570\u636E\u79EF\u538B\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +pusher.kafka.native.topic.record_buffer_overstock=fdr.pusher.record_buffer_overstock +# \u8BB0\u5F55\u4FA7\u5904\u7406\u5668\u7F13\u51B2\u6570\u636E\u79EF\u538B\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898(\u4E00\u822C\u6570\u636E\u66F4\u65B0)\u3002 +pusher.kafka.native.topic.keepNormal_consume_buffer_overstock=fdr.pusher.keepNormal_consume_buffer_overstock +# \u8BB0\u5F55\u4FA7\u5904\u7406\u5668\u7F13\u51B2\u6570\u636E\u79EF\u538B\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898(\u88AB\u8FC7\u6EE4\u6570\u636E\u66F4\u65B0)\u3002 +pusher.kafka.native.topic.keepFiltered_consume_buffer_overstock=fdr.pusher.keepFiltered_consume_buffer_overstock +# \u8BB0\u5F55\u4FA7\u5904\u7406\u5668\u7F13\u51B2\u6570\u636E\u79EF\u538B\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898(\u88AB\u89E6\u53D1\u6570\u636E\u66F4\u65B0)\u3002 +pusher.kafka.native.topic.keepTriggered_consume_buffer_overstock=fdr.pusher.keepTriggered_consume_buffer_overstock +# \u8BB0\u5F55\u4FA7\u5904\u7406\u5668\u7F13\u51B2\u6570\u636E\u79EF\u538B\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898(\u4E00\u822C\u6570\u636E\u8BB0\u5F55)\u3002 +pusher.kafka.native.topic.persistNormal_consume_buffer_overstock=fdr.pusher.persistNormal_consume_buffer_overstock +# \u8BB0\u5F55\u4FA7\u5904\u7406\u5668\u7F13\u51B2\u6570\u636E\u79EF\u538B\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898(\u88AB\u8FC7\u6EE4\u6570\u636E\u8BB0\u5F55)\u3002 +pusher.kafka.native.topic.persistFiltered_consume_buffer_overstock=fdr.pusher.persistFiltered_consume_buffer_overstock +# \u8BB0\u5F55\u4FA7\u5904\u7406\u5668\u7F13\u51B2\u6570\u636E\u79EF\u538B\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898(\u88AB\u89E6\u53D1\u6570\u636E\u8BB0\u5F55)\u3002 +pusher.kafka.native.topic.persistTriggered_consume_buffer_overstock=fdr.pusher.persistTriggered_consume_buffer_overstock # ################################################### # log # diff --git a/fdr-stack/src/main/java/com/dwarfeng/fdr/stack/handler/PushHandler.java b/fdr-stack/src/main/java/com/dwarfeng/fdr/stack/handler/PushHandler.java index 62b56d2618b10932f515a344a88ee2fc711a3298..b629c11c67ba6b2323f464783ac3aa6acca61ff4 100644 --- a/fdr-stack/src/main/java/com/dwarfeng/fdr/stack/handler/PushHandler.java +++ b/fdr-stack/src/main/java/com/dwarfeng/fdr/stack/handler/PushHandler.java @@ -2,6 +2,7 @@ package com.dwarfeng.fdr.stack.handler; import com.dwarfeng.fdr.stack.bean.dto.FilteredData; import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import com.dwarfeng.fdr.stack.bean.dto.RecordInfo; import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; import com.dwarfeng.subgrade.stack.exception.HandlerException; import com.dwarfeng.subgrade.stack.handler.Handler; @@ -19,98 +20,98 @@ public interface PushHandler extends Handler { /** * 一般数据更新时执行的广播操作。 * - * @param normalRecord 一般数据记录。 + * @param normalData 一般数据记录。 * @throws HandlerException 处理器异常。 */ - void normalUpdated(NormalData normalRecord) throws HandlerException; + void normalUpdated(NormalData normalData) throws HandlerException; /** * 一般数据更新时执行的广播操作。 * - * @param normalRecords 一般数据记录组成的列表。 + * @param normalDatas 一般数据记录组成的列表。 * @throws HandlerException 处理器异常。 */ - void normalUpdated(List normalRecords) throws HandlerException; + void normalUpdated(List normalDatas) throws HandlerException; /** * 一般数据记录时执行的广播操作。 * - * @param normalRecord 一般数据记录。 + * @param normalData 一般数据记录。 * @throws HandlerException 处理器异常。 */ - void normalRecorded(NormalData normalRecord) throws HandlerException; + void normalRecorded(NormalData normalData) throws HandlerException; /** * 一般数据记录时执行的广播操作。 * - * @param normalRecords 一般数据记录组成的列表。 + * @param normalDatas 一般数据记录组成的列表。 * @throws HandlerException 处理器异常。 */ - void normalRecorded(List normalRecords) throws HandlerException; + void normalRecorded(List normalDatas) throws HandlerException; /** * 被过滤数据更新时执行的广播操作。 * - * @param filteredRecord 被过滤数据记录。 + * @param filteredData 被过滤数据记录。 * @throws HandlerException 处理器异常。 */ - void filteredUpdated(FilteredData filteredRecord) throws HandlerException; + void filteredUpdated(FilteredData filteredData) throws HandlerException; /** * 被过滤数据更新时执行的广播操作。 * - * @param filteredRecords 被过滤数据记录组成的列表。 + * @param filteredDatas 被过滤数据记录组成的列表。 * @throws HandlerException 处理器异常。 */ - void filteredUpdated(List filteredRecords) throws HandlerException; + void filteredUpdated(List filteredDatas) throws HandlerException; /** * 被过滤数据记录时执行的广播操作。 * - * @param filteredRecord 被过滤数据记录。 + * @param filteredData 被过滤数据记录。 * @throws HandlerException 处理器异常。 */ - void filteredRecorded(FilteredData filteredRecord) throws HandlerException; + void filteredRecorded(FilteredData filteredData) throws HandlerException; /** * 被过滤数据记录时执行的广播操作。 * - * @param filteredRecords 被过滤数据记录组成的列表。 + * @param filteredDatas 被过滤数据记录组成的列表。 * @throws HandlerException 处理器异常。 */ - void filteredRecorded(List filteredRecords) throws HandlerException; + void filteredRecorded(List filteredDatas) throws HandlerException; /** * 被触发数据更新时执行的广播操作。 * - * @param triggeredRecord 被触发数据记录。 + * @param triggeredData 被触发数据记录。 * @throws HandlerException 处理器异常。 */ - void triggeredUpdated(TriggeredData triggeredRecord) throws HandlerException; + void triggeredUpdated(TriggeredData triggeredData) throws HandlerException; /** * 被触发数据更新时执行的广播操作。 * - * @param triggeredRecords 被触发数据记录组成的列表。 + * @param triggeredDatas 被触发数据记录组成的列表。 * @throws HandlerException 处理器异常。 */ - void triggeredUpdated(List triggeredRecords) throws HandlerException; + void triggeredUpdated(List triggeredDatas) throws HandlerException; /** * 被触发数据记录时执行的广播操作。 * - * @param triggeredRecord 被触发数据记录。 + * @param triggeredData 被触发数据记录。 * @throws HandlerException 处理器异常。 */ - void triggeredRecorded(TriggeredData triggeredRecord) throws HandlerException; + void triggeredRecorded(TriggeredData triggeredData) throws HandlerException; /** * 被触发数据记录时执行的广播操作。 * - * @param triggeredRecords 被触发数据记录组成的列表。 + * @param triggeredDatas 被触发数据记录组成的列表。 * @throws HandlerException 处理器异常。 */ - void triggeredRecorded(List triggeredRecords) throws HandlerException; + void triggeredRecorded(List triggeredDatas) throws HandlerException; /** * 记录功能重置时执行的广播操作。 @@ -125,4 +126,156 @@ public interface PushHandler extends Handler { * @throws HandlerException 处理器异常。 */ void mapReset() throws HandlerException; + + /** + * 处理失败的广播操作. + * + * @throws HandlerException 处理器异常。 + */ + void recordFailed(RecordInfo recordInfo) throws HandlerException; + + /** + * 一般数据更新失败的广播操作。 + * + * @param normalData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void keepNormalFailed(NormalData normalData) throws HandlerException; + + /** + * 一般数据更新失败的广播操作。 + * + * @param normalDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void keepNormalFailed(List normalDatas) throws HandlerException; + + /** + * 一般数据记录失败的广播操作。 + * + * @param normalData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void persistNormalFailed(NormalData normalData) throws HandlerException; + + /** + * 一般数据记录失败的广播操作。 + * + * @param normalDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void persistNormalFailed(List normalDatas) throws HandlerException; + + /** + * 被过滤数据更新失败的广播操作。 + * + * @param filteredData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void keepFilteredFailed(FilteredData filteredData) throws HandlerException; + + /** + * 被过滤数据更新失败的广播操作。 + * + * @param filteredDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void keepFilteredFailed(List filteredDatas) throws HandlerException; + + /** + * 被过滤数据记录失败的广播操作。 + * + * @param filteredData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void persistFilteredFailed(FilteredData filteredData) throws HandlerException; + + /** + * 被过滤数据记录失败的广播操作。 + * + * @param filteredDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void persistFilteredFailed(List filteredDatas) throws HandlerException; + + /** + * 被触发数据更新失败的广播操作。 + * + * @param triggeredData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void keepTriggeredFailed(TriggeredData triggeredData) throws HandlerException; + + /** + * 被触发数据更新失败的广播操作。 + * + * @param triggeredDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void keepTriggeredFailed(List triggeredDatas) throws HandlerException; + + /** + * 被触发数据记录失败的广播操作。 + * + * @param triggeredData 一般数据记录 + * @throws HandlerException 处理器异常。 + */ + void persistTriggeredFailed(TriggeredData triggeredData) throws HandlerException; + + /** + * 被触发数据记录失败的广播操作。 + * + * @param triggeredDatas 一般数据记录组成的列表。 + * @throws HandlerException 处理器异常。 + */ + void persistTriggeredFailed(List triggeredDatas) throws HandlerException; + + /** + * 逻辑侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void recordBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 一般数据更新时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void keepNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 被过滤数据更新时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void keepFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 被触发数据更新时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void keepTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 一般数据记录时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void persistNormalConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 被过滤数据记录时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void persistFilteredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; + + /** + * 被触发数据记录时记录侧处理器缓冲数据积压的广播操作。 + * + * @throws HandlerException 处理器异常。 + */ + void persistTriggeredConsumeBufferOverstock(double bufferSize, double allBufferSize, double ratio, double warnThreshold) throws HandlerException; }