From 67e087f47b3131a39415139598459f32c7e445cf Mon Sep 17 00:00:00 2001 From: DwArFeng <915724865@qq.com> Date: Mon, 19 Jun 2023 17:35:00 +0800 Subject: [PATCH 1/2] =?UTF-8?q?Wiki=20=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 优化 Contents.md 中的内容 Signed-off-by: DwArFeng <915724865@qq.com> --- CHANGELOG.md | 3 ++- docs/wiki/zh_CN/Contents.md | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6eb1b579..635fd290 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ #### 功能构建 -- (无) +- Wiki 更新。 + - 优化 `Contents.md` 中的内容。 #### Bug修复 diff --git a/docs/wiki/zh_CN/Contents.md b/docs/wiki/zh_CN/Contents.md index cec790e5..c97b1108 100644 --- a/docs/wiki/zh_CN/Contents.md +++ b/docs/wiki/zh_CN/Contents.md @@ -43,7 +43,6 @@ - [Source](./Source.md) - 数据源,详细说明了本项目的数据源机制。 - [PresetSourceImplements.md](./PresetSourceImplements.md) - 预设数据源实现,详细说明了本项目内置的所有数据源。 - - [Washer](./Washer.md) - 清洗器,详细说明了本项目的清洗器机制。 ## 维护与调试 -- Gitee From b7f5be1245c0966cb427919e9d39eb4e431d26b5 Mon Sep 17 00:00:00 2001 From: liuwy <1421132346@qq.com> Date: Tue, 20 Jun 2023 15:03:25 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=A1=A5=E6=8E=A5=E5=99=A8=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NativeKafkaBridge DwarfengDctKafkaBridge --- .../DwarfengDctKafkaBridge.java | 85 ++++++++++++ ...rfengDctKafkaBridgeFilteredDataKeeper.java | 64 +++++++++ ...ngDctKafkaBridgeFilteredDataPersister.java | 64 +++++++++ .../DwarfengDctKafkaBridgeKeeper.java | 30 +++++ ...warfengDctKafkaBridgeNormalDataKeeper.java | 60 +++++++++ ...fengDctKafkaBridgeNormalDataPersister.java | 60 +++++++++ .../DwarfengDctKafkaBridgePersister.java | 30 +++++ ...fengDctKafkaBridgeTriggeredDataKeeper.java | 64 +++++++++ ...gDctKafkaBridgeTriggeredDataPersister.java | 64 +++++++++ ...engDctKafkaBridgeFastJsonFilteredData.java | 124 +++++++++++++++++ ...rfengDctKafkaBridgeFastJsonNormalData.java | 94 +++++++++++++ ...ngDctKafkaBridgeFastJsonTriggeredData.java | 126 ++++++++++++++++++ .../DwarfengDctKafkaBridgeFilteredData.java | 88 ++++++++++++ .../DwarfengDctKafkaBridgeNormalData.java | 67 ++++++++++ .../DwarfengDctKafkaBridgeTriggeredData.java | 90 +++++++++++++ .../bridge/nativeKafka/NativeKafkaBridge.java | 86 ++++++++++++ .../NativeKafkaBridgeFilteredDataKeeper.java | 63 +++++++++ ...ativeKafkaBridgeFilteredDataPersister.java | 63 +++++++++ .../nativeKafka/NativeKafkaBridgeKeeper.java | 25 ++++ .../NativeKafkaBridgeNormalDataKeeper.java | 59 ++++++++ .../NativeKafkaBridgeNormalDataPersister.java | 59 ++++++++ .../NativeKafkaBridgePersister.java | 25 ++++ .../NativeKafkaBridgeTriggeredDataKeeper.java | 63 +++++++++ ...tiveKafkaBridgeTriggeredDataPersister.java | 63 +++++++++ ...NativeKafkaBridgeFastJsonFilteredData.java | 124 +++++++++++++++++ .../NativeKafkaBridgeFastJsonNormalData.java | 94 +++++++++++++ ...ativeKafkaBridgeFastJsonTriggeredData.java | 126 ++++++++++++++++++ .../bean/NativeKafkaBridgeFilteredData.java | 90 +++++++++++++ .../bean/NativeKafkaBridgeNormalData.java | 70 ++++++++++ .../bean/NativeKafkaBridgeTriggeredData.java | 94 +++++++++++++ .../src/main/resources/fdr/bridge.properties | 58 ++++++++ 31 files changed, 2272 insertions(+) create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridge.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeFilteredDataKeeper.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeFilteredDataPersister.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeKeeper.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeNormalDataKeeper.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeNormalDataPersister.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgePersister.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeTriggeredDataKeeper.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeTriggeredDataPersister.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonFilteredData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonNormalData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonTriggeredData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFilteredData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeNormalData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeTriggeredData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridge.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeFilteredDataKeeper.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeFilteredDataPersister.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeKeeper.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeNormalDataKeeper.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeNormalDataPersister.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgePersister.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeTriggeredDataKeeper.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeTriggeredDataPersister.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonFilteredData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonNormalData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonTriggeredData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFilteredData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeNormalData.java create mode 100644 fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeTriggeredData.java diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridge.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridge.java new file mode 100644 index 00000000..a1f07d38 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridge.java @@ -0,0 +1,85 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka; + +import com.dwarfeng.fdr.impl.handler.bridge.FullBridge; +import com.dwarfeng.fdr.stack.bean.dto.FilteredData; +import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; +import com.dwarfeng.subgrade.stack.exception.HandlerException; + +/** + * DwarfengDctKafka 桥接器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class DwarfengDctKafkaBridge extends FullBridge { + + public static final String BRIDGE_TYPE = "kafka.dwarfengDct"; + + private final DwarfengDctKafkaBridgeNormalDataKeeper normalDataKeeper; + private final DwarfengDctKafkaBridgeFilteredDataKeeper filteredDataKeeper; + private final DwarfengDctKafkaBridgeTriggeredDataKeeper triggeredDataKeeper; + private final DwarfengDctKafkaBridgeNormalDataPersister normalDataPersister; + private final DwarfengDctKafkaBridgeFilteredDataPersister filteredDataPersister; + private final DwarfengDctKafkaBridgeTriggeredDataPersister triggeredDataPersister; + + public DwarfengDctKafkaBridge( + DwarfengDctKafkaBridgeNormalDataKeeper normalDataKeeper, + DwarfengDctKafkaBridgeFilteredDataKeeper filteredDataKeeper, + DwarfengDctKafkaBridgeTriggeredDataKeeper triggeredDataKeeper, + DwarfengDctKafkaBridgeNormalDataPersister normalDataPersister, + DwarfengDctKafkaBridgeFilteredDataPersister filteredDataPersister, + DwarfengDctKafkaBridgeTriggeredDataPersister triggeredDataPersister + ) { + super(BRIDGE_TYPE); + this.normalDataKeeper = normalDataKeeper; + this.filteredDataKeeper = filteredDataKeeper; + this.triggeredDataKeeper = triggeredDataKeeper; + this.normalDataPersister = normalDataPersister; + this.filteredDataPersister = filteredDataPersister; + this.triggeredDataPersister = triggeredDataPersister; + } + + @Override + public Keeper getNormalDataKeeper() throws HandlerException { + return normalDataKeeper; + } + + @Override + public Keeper getFilteredDataKeeper() throws HandlerException { + return filteredDataKeeper; + } + + @Override + public Keeper getTriggeredDataKeeper() throws HandlerException { + return triggeredDataKeeper; + } + + @Override + public Persister getNormalDataPersister() throws HandlerException { + return normalDataPersister; + } + + @Override + public Persister getFilteredDataPersister() throws HandlerException { + return filteredDataPersister; + } + + @Override + public Persister getTriggeredDataPersister() throws HandlerException { + return triggeredDataPersister; + } + + @Override + public String toString() { + return "DwarfengDctKafkaBridge{" + + "normalDataKeeper=" + normalDataKeeper + + ", filteredDataKeeper=" + filteredDataKeeper + + ", triggeredDataKeeper=" + triggeredDataKeeper + + ", normalDataPersister=" + normalDataPersister + + ", filteredDataPersister=" + filteredDataPersister + + ", triggeredDataPersister=" + triggeredDataPersister + + ", bridgeType='" + bridgeType + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeFilteredDataKeeper.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeFilteredDataKeeper.java new file mode 100644 index 00000000..3b4336f6 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeFilteredDataKeeper.java @@ -0,0 +1,64 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.dct.handler.ValueCodingHandler; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeFastJsonFilteredData; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeFilteredData; +import com.dwarfeng.fdr.stack.bean.dto.FilteredData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * DwarfengDctKafka 桥接器被过滤数据保持器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class DwarfengDctKafkaBridgeFilteredDataKeeper extends DwarfengDctKafkaBridgeKeeper { + + @Value("${bridge.kafka.dwarfengDct.topic.filtered_updated}") + private String filteredUpdatedTopic; + + public DwarfengDctKafkaBridgeFilteredDataKeeper(KafkaTemplate kafkaTemplate, ValueCodingHandler valueCodingHandler) { + super(kafkaTemplate, valueCodingHandler); + } + + @Override + protected void doUpdate(FilteredData data) throws Exception { + DwarfengDctKafkaBridgeFilteredData filteredData = transform(data); + String message = JSON.toJSONString(DwarfengDctKafkaBridgeFastJsonFilteredData.of(filteredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(filteredUpdatedTopic, message); + } + + @Override + protected void doUpdate(List datas) throws Exception { + for (FilteredData data : datas) { + doUpdate(data); + } + } + + @Override + protected DwarfengDctKafkaBridgeFilteredData transform(FilteredData data) throws Exception { + return new DwarfengDctKafkaBridgeFilteredData( + data.getPointKey(), + data.getFilterKey(), + valueCodingHandler.encode(data.getValue()), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected FilteredData of(DwarfengDctKafkaBridgeFilteredData data) throws Exception { + return new FilteredData( + data.getPointKey(), + data.getFilterKey(), + valueCodingHandler.decode(data.getValue()), + data.getMessage(), + data.getHappenedDate() + ); + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeFilteredDataPersister.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeFilteredDataPersister.java new file mode 100644 index 00000000..50fbc20a --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeFilteredDataPersister.java @@ -0,0 +1,64 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.dct.handler.ValueCodingHandler; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeFastJsonFilteredData; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeFilteredData; +import com.dwarfeng.fdr.stack.bean.dto.FilteredData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * DwarfengDctKafka 桥接器被过滤数据持久器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class DwarfengDctKafkaBridgeFilteredDataPersister extends DwarfengDctKafkaBridgePersister { + + @Value("${bridge.kafka.dwarfengDct.topic.filtered_recorded}") + private String filteredRecordedTopic; + + public DwarfengDctKafkaBridgeFilteredDataPersister(KafkaTemplate kafkaTemplate, ValueCodingHandler valueCodingHandler) { + super(kafkaTemplate, valueCodingHandler); + } + + @Override + protected void doRecord(FilteredData data) throws Exception { + DwarfengDctKafkaBridgeFilteredData filteredData = transform(data); + String message = JSON.toJSONString(DwarfengDctKafkaBridgeFastJsonFilteredData.of(filteredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(filteredRecordedTopic, message); + } + + @Override + protected void doRecord(List datas) throws Exception { + for (FilteredData data : datas) { + doRecord(data); + } + } + + @Override + protected DwarfengDctKafkaBridgeFilteredData transform(FilteredData data) throws Exception { + return new DwarfengDctKafkaBridgeFilteredData( + data.getPointKey(), + data.getFilterKey(), + valueCodingHandler.encode(data.getValue()), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected FilteredData of(DwarfengDctKafkaBridgeFilteredData data) throws Exception { + return new FilteredData( + data.getPointKey(), + data.getFilterKey(), + valueCodingHandler.decode(data.getValue()), + data.getMessage(), + data.getHappenedDate() + ); + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeKeeper.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeKeeper.java new file mode 100644 index 00000000..172721c9 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeKeeper.java @@ -0,0 +1,30 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka; + +import com.dwarfeng.dct.handler.ValueCodingHandler; +import com.dwarfeng.fdr.impl.handler.bridge.WriteOnlyKeeper; +import com.dwarfeng.fdr.stack.struct.Data; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.kafka.core.KafkaTemplate; + +/** + * DwarfengDctKafka 桥接器保持器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public abstract class DwarfengDctKafkaBridgeKeeper extends WriteOnlyKeeper { + + protected KafkaTemplate kafkaTemplate; + + protected ValueCodingHandler valueCodingHandler; + + public DwarfengDctKafkaBridgeKeeper(@Qualifier("nativeKafkaPusher.kafkaTemplate") KafkaTemplate kafkaTemplate, + ValueCodingHandler valueCodingHandler) { + this.kafkaTemplate = kafkaTemplate; + this.valueCodingHandler = valueCodingHandler; + } + + protected abstract T transform(D data) throws Exception; + + protected abstract D of(T data) throws Exception; +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeNormalDataKeeper.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeNormalDataKeeper.java new file mode 100644 index 00000000..24619ac4 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeNormalDataKeeper.java @@ -0,0 +1,60 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.dct.handler.ValueCodingHandler; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeFastJsonNormalData; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeNormalData; +import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * DwarfengDctKafka 桥接器一般数据保持器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class DwarfengDctKafkaBridgeNormalDataKeeper extends DwarfengDctKafkaBridgeKeeper { + + @Value("${bridge.kafka.dwarfengDct.topic.normal_updated}") + private String normalUpdatedTopic; + + public DwarfengDctKafkaBridgeNormalDataKeeper(KafkaTemplate kafkaTemplate, ValueCodingHandler valueCodingHandler) { + super(kafkaTemplate, valueCodingHandler); + } + + @Override + protected void doUpdate(NormalData data) throws Exception { + DwarfengDctKafkaBridgeNormalData normalData = transform(data); + String message = JSON.toJSONString(DwarfengDctKafkaBridgeFastJsonNormalData.of(normalData), SerializerFeature.WriteClassName); + kafkaTemplate.send(normalUpdatedTopic, message); + } + + @Override + protected void doUpdate(List datas) throws Exception { + for (NormalData data : datas) { + doUpdate(data); + } + } + + @Override + protected DwarfengDctKafkaBridgeNormalData transform(NormalData data) throws Exception { + return new DwarfengDctKafkaBridgeNormalData( + data.getPointKey(), + valueCodingHandler.encode(data.getValue()), + data.getHappenedDate() + ); + } + + @Override + protected NormalData of(DwarfengDctKafkaBridgeNormalData data) throws Exception { + return new NormalData( + data.getPointKey(), + valueCodingHandler.decode(data.getValue()), + data.getHappenedDate() + ); + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeNormalDataPersister.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeNormalDataPersister.java new file mode 100644 index 00000000..5421f761 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeNormalDataPersister.java @@ -0,0 +1,60 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.dct.handler.ValueCodingHandler; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeFastJsonNormalData; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeNormalData; +import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * DwarfengDctKafka 桥接器一般数据持久器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class DwarfengDctKafkaBridgeNormalDataPersister extends DwarfengDctKafkaBridgePersister { + + @Value("${bridge.kafka.dwarfengDct.topic.normal_recorded}") + private String normalRecordedTopic; + + public DwarfengDctKafkaBridgeNormalDataPersister(KafkaTemplate kafkaTemplate, ValueCodingHandler valueCodingHandler) { + super(kafkaTemplate, valueCodingHandler); + } + + @Override + protected void doRecord(NormalData data) throws Exception { + DwarfengDctKafkaBridgeNormalData normalData = transform(data); + String message = JSON.toJSONString(DwarfengDctKafkaBridgeFastJsonNormalData.of(normalData), SerializerFeature.WriteClassName); + kafkaTemplate.send(normalRecordedTopic, message); + } + + @Override + protected void doRecord(List datas) throws Exception { + for (NormalData data : datas) { + doRecord(data); + } + } + + @Override + protected DwarfengDctKafkaBridgeNormalData transform(NormalData data) throws Exception { + return new DwarfengDctKafkaBridgeNormalData( + data.getPointKey(), + valueCodingHandler.encode(data.getValue()), + data.getHappenedDate() + ); + } + + @Override + protected NormalData of(DwarfengDctKafkaBridgeNormalData data) throws Exception { + return new NormalData( + data.getPointKey(), + valueCodingHandler.decode(data.getValue()), + data.getHappenedDate() + ); + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgePersister.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgePersister.java new file mode 100644 index 00000000..45abb250 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgePersister.java @@ -0,0 +1,30 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka; + +import com.dwarfeng.dct.handler.ValueCodingHandler; +import com.dwarfeng.fdr.impl.handler.bridge.WriteOnlyPersister; +import com.dwarfeng.fdr.stack.struct.Data; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.kafka.core.KafkaTemplate; + +/** + * DwarfengDctKafka 桥接器持久器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public abstract class DwarfengDctKafkaBridgePersister extends WriteOnlyPersister { + + protected KafkaTemplate kafkaTemplate; + + protected ValueCodingHandler valueCodingHandler; + + public DwarfengDctKafkaBridgePersister(@Qualifier("nativeKafkaPusher.kafkaTemplate") KafkaTemplate kafkaTemplate, + ValueCodingHandler valueCodingHandler) { + this.kafkaTemplate = kafkaTemplate; + this.valueCodingHandler = valueCodingHandler; + } + + protected abstract T transform(D data) throws Exception; + + protected abstract D of(T data) throws Exception; +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeTriggeredDataKeeper.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeTriggeredDataKeeper.java new file mode 100644 index 00000000..ff547532 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeTriggeredDataKeeper.java @@ -0,0 +1,64 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.dct.handler.ValueCodingHandler; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeFastJsonTriggeredData; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeTriggeredData; +import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * DwarfengDctKafka 桥接器被触发数据保持器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class DwarfengDctKafkaBridgeTriggeredDataKeeper extends DwarfengDctKafkaBridgeKeeper { + + @Value("${bridge.kafka.dwarfengDct.topic.triggered_updated}") + private String triggeredUpdatedTopic; + + public DwarfengDctKafkaBridgeTriggeredDataKeeper(KafkaTemplate kafkaTemplate, ValueCodingHandler valueCodingHandler) { + super(kafkaTemplate, valueCodingHandler); + } + + @Override + protected void doUpdate(TriggeredData data) throws Exception { + DwarfengDctKafkaBridgeTriggeredData triggeredData = transform(data); + String message = JSON.toJSONString(DwarfengDctKafkaBridgeFastJsonTriggeredData.of(triggeredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(triggeredUpdatedTopic, message); + } + + @Override + protected void doUpdate(List datas) throws Exception { + for (TriggeredData data : datas) { + doUpdate(data); + } + } + + @Override + protected DwarfengDctKafkaBridgeTriggeredData transform(TriggeredData data) throws Exception { + return new DwarfengDctKafkaBridgeTriggeredData( + data.getPointKey(), + data.getTriggerKey(), + valueCodingHandler.encode(data.getValue()), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected TriggeredData of(DwarfengDctKafkaBridgeTriggeredData data) throws Exception { + return new TriggeredData( + data.getPointKey(), + data.getTriggerKey(), + valueCodingHandler.decode(data.getValue()), + data.getMessage(), + data.getHappenedDate() + ); + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeTriggeredDataPersister.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeTriggeredDataPersister.java new file mode 100644 index 00000000..daf91223 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/DwarfengDctKafkaBridgeTriggeredDataPersister.java @@ -0,0 +1,64 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.dct.handler.ValueCodingHandler; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeFastJsonTriggeredData; +import com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean.DwarfengDctKafkaBridgeTriggeredData; +import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * DwarfengDctKafka 桥接器被触发数据持久器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class DwarfengDctKafkaBridgeTriggeredDataPersister extends DwarfengDctKafkaBridgePersister { + + @Value("${bridge.kafka.dwarfengDct.topic.triggered_recorded}") + private String triggeredRecordedTopic; + + public DwarfengDctKafkaBridgeTriggeredDataPersister(KafkaTemplate kafkaTemplate, ValueCodingHandler valueCodingHandler) { + super(kafkaTemplate, valueCodingHandler); + } + + @Override + protected void doRecord(TriggeredData data) throws Exception { + DwarfengDctKafkaBridgeTriggeredData triggeredData = transform(data); + String message = JSON.toJSONString(DwarfengDctKafkaBridgeFastJsonTriggeredData.of(triggeredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(triggeredRecordedTopic, message); + } + + @Override + protected void doRecord(List datas) throws Exception { + for (TriggeredData data: datas) { + doRecord(data); + } + } + + @Override + protected DwarfengDctKafkaBridgeTriggeredData transform(TriggeredData data) throws Exception { + return new DwarfengDctKafkaBridgeTriggeredData( + data.getPointKey(), + data.getTriggerKey(), + valueCodingHandler.encode(data.getValue()), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected TriggeredData of(DwarfengDctKafkaBridgeTriggeredData data) throws Exception { + return new TriggeredData( + data.getPointKey(), + data.getTriggerKey(), + valueCodingHandler.decode(data.getValue()), + data.getMessage(), + data.getHappenedDate() + ); + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonFilteredData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonFilteredData.java new file mode 100644 index 00000000..48d33303 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonFilteredData.java @@ -0,0 +1,124 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean; + +import com.alibaba.fastjson.annotation.JSONField; +import com.dwarfeng.subgrade.sdk.bean.key.FastJsonLongIdKey; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; + +import java.util.Date; +import java.util.Objects; + +/** + * DwarfengDctKafka 桥接 FastJsonFlat 被过滤数据。 + * + * @author mooyuan + * @since 2.0.0 + */ +public class DwarfengDctKafkaBridgeFastJsonFilteredData implements Dto { + + private static final long serialVersionUID = 7711135303412788932L; + + public static DwarfengDctKafkaBridgeFastJsonFilteredData of(DwarfengDctKafkaBridgeFilteredData filteredData) { + if (Objects.isNull(filteredData)) { + return null; + } else { + return new DwarfengDctKafkaBridgeFastJsonFilteredData( + FastJsonLongIdKey.of(filteredData.getPointKey()), + FastJsonLongIdKey.of(filteredData.getFilterKey()), + filteredData.getValue(), + filteredData.getMessage(), + filteredData.getHappenedDate() + ); + } + } + + public static DwarfengDctKafkaBridgeFilteredData toStackBean(DwarfengDctKafkaBridgeFastJsonFilteredData fastFilteredData) { + if (Objects.isNull(fastFilteredData)) { + return null; + } else { + return new DwarfengDctKafkaBridgeFilteredData( + FastJsonLongIdKey.toStackBean(fastFilteredData.getPointKey()), + FastJsonLongIdKey.toStackBean(fastFilteredData.getFilterKey()), + fastFilteredData.getValue(), + fastFilteredData.getMessage(), + fastFilteredData.getHappenedDate() + ); + } + } + + @JSONField(name = "point_key", ordinal = 1) + private FastJsonLongIdKey pointKey; + + @JSONField(name = "filter_key", ordinal = 2) + private FastJsonLongIdKey filterKey; + + @JSONField(name = "value", ordinal = 3) + private String value; + + @JSONField(name = "message", ordinal = 4) + private String message; + + @JSONField(name = "happened_date", ordinal = 5) + private Date happenedDate; + + public DwarfengDctKafkaBridgeFastJsonFilteredData() { + } + + public DwarfengDctKafkaBridgeFastJsonFilteredData(FastJsonLongIdKey pointKey, FastJsonLongIdKey filterKey, String value, String message, Date happenedDate) { + this.pointKey = pointKey; + this.filterKey = filterKey; + this.value = value; + this.message = message; + this.happenedDate = happenedDate; + } + + public FastJsonLongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(FastJsonLongIdKey pointKey) { + this.pointKey = pointKey; + } + + public FastJsonLongIdKey getFilterKey() { + return filterKey; + } + + public void setFilterKey(FastJsonLongIdKey filterKey) { + this.filterKey = filterKey; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "NativeKafkaBridgeFastJsonFilteredData{" + + "pointKey=" + pointKey + + ", filterKey=" + filterKey + + ", value=" + value + + ", message='" + message + '\'' + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonNormalData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonNormalData.java new file mode 100644 index 00000000..99df81c7 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonNormalData.java @@ -0,0 +1,94 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean; + +import com.alibaba.fastjson.annotation.JSONField; +import com.dwarfeng.subgrade.sdk.bean.key.FastJsonLongIdKey; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; + +import java.util.Date; +import java.util.Objects; + +/** + * DwarfengDctKafka 桥接 FastJsonFlat 一般数据。 + * + * @author mooyuan + * @since 2.0.0 + */ +public class DwarfengDctKafkaBridgeFastJsonNormalData implements Dto { + + private static final long serialVersionUID = 2913355959528518235L; + + public static DwarfengDctKafkaBridgeFastJsonNormalData of(DwarfengDctKafkaBridgeNormalData normalData) { + if (Objects.isNull(normalData)) { + return null; + } else { + return new DwarfengDctKafkaBridgeFastJsonNormalData( + FastJsonLongIdKey.of(normalData.getPointKey()), + normalData.getValue(), + normalData.getHappenedDate() + ); + } + } + + public static DwarfengDctKafkaBridgeNormalData toStackBean(DwarfengDctKafkaBridgeFastJsonNormalData fastNormalData) { + if (Objects.isNull(fastNormalData)) { + return null; + } else { + return new DwarfengDctKafkaBridgeNormalData( + FastJsonLongIdKey.toStackBean(fastNormalData.getPointKey()), + fastNormalData.getValue(), + fastNormalData.getHappenedDate() + ); + } + } + + @JSONField(name = "point_key", ordinal = 1) + private FastJsonLongIdKey pointKey; + + @JSONField(name = "value", ordinal = 2) + private String value; + + @JSONField(name = "happened_date", ordinal = 3) + private Date happenedDate; + + public DwarfengDctKafkaBridgeFastJsonNormalData() { + } + + public DwarfengDctKafkaBridgeFastJsonNormalData(FastJsonLongIdKey pointKey, String value, Date happenedDate) { + this.pointKey = pointKey; + this.value = value; + this.happenedDate = happenedDate; + } + + public FastJsonLongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(FastJsonLongIdKey pointKey) { + this.pointKey = pointKey; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "FastJsonNormalData{" + + "pointKey=" + pointKey + + ", value=" + value + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonTriggeredData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonTriggeredData.java new file mode 100644 index 00000000..f0807c6f --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFastJsonTriggeredData.java @@ -0,0 +1,126 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean; + +import com.alibaba.fastjson.annotation.JSONField; +import com.dwarfeng.subgrade.sdk.bean.key.FastJsonLongIdKey; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; + +import java.util.Date; +import java.util.Objects; + +/** + * DwarfengDctKafka 桥接 FastJsonFlat 被触发数据。 + * + * @author mooyuan + * @since 2.0.0 + */ +public class DwarfengDctKafkaBridgeFastJsonTriggeredData implements Dto { + + private static final long serialVersionUID = -1558927133318021024L; + + public static DwarfengDctKafkaBridgeFastJsonTriggeredData of(DwarfengDctKafkaBridgeTriggeredData triggeredData) { + if (Objects.isNull(triggeredData)) { + return null; + } else { + return new DwarfengDctKafkaBridgeFastJsonTriggeredData( + FastJsonLongIdKey.of(triggeredData.getPointKey()), + FastJsonLongIdKey.of(triggeredData.getTriggerKey()), + triggeredData.getValue(), + triggeredData.getMessage(), + triggeredData.getHappenedDate() + ); + } + } + + public static DwarfengDctKafkaBridgeTriggeredData toStackBean(DwarfengDctKafkaBridgeFastJsonTriggeredData fastTriggeredData) { + if (Objects.isNull(fastTriggeredData)) { + return null; + } else { + return new DwarfengDctKafkaBridgeTriggeredData( + FastJsonLongIdKey.toStackBean(fastTriggeredData.getPointKey()), + FastJsonLongIdKey.toStackBean(fastTriggeredData.getTriggerKey()), + fastTriggeredData.getValue(), + fastTriggeredData.getMessage(), + fastTriggeredData.getHappenedDate() + ); + } + } + + @JSONField(name = "point_key", ordinal = 1) + private FastJsonLongIdKey pointKey; + + @JSONField(name = "trigger_key", ordinal = 2) + private FastJsonLongIdKey triggerKey; + + @JSONField(name = "value", ordinal = 3) + private String value; + + @JSONField(name = "message", ordinal = 4) + private String message; + + @JSONField(name = "happened_date", ordinal = 5) + private Date happenedDate; + + public DwarfengDctKafkaBridgeFastJsonTriggeredData() { + } + + public DwarfengDctKafkaBridgeFastJsonTriggeredData( + FastJsonLongIdKey pointKey, FastJsonLongIdKey triggerKey, String value, String message, Date happenedDate + ) { + this.pointKey = pointKey; + this.triggerKey = triggerKey; + this.value = value; + this.message = message; + this.happenedDate = happenedDate; + } + + public FastJsonLongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(FastJsonLongIdKey pointKey) { + this.pointKey = pointKey; + } + + public FastJsonLongIdKey getTriggerKey() { + return triggerKey; + } + + public void setTriggerKey(FastJsonLongIdKey triggerKey) { + this.triggerKey = triggerKey; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "FastJsonTriggeredData{" + + "pointKey=" + pointKey + + ", triggerKey=" + triggerKey + + ", value=" + value + + ", message='" + message + '\'' + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFilteredData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFilteredData.java new file mode 100644 index 00000000..21f5ee49 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeFilteredData.java @@ -0,0 +1,88 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean; + +import com.dwarfeng.fdr.stack.struct.Data; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; +import com.dwarfeng.subgrade.stack.bean.key.LongIdKey; +import org.jetbrains.annotations.NotNull; + +import java.util.Date; + +/** + * DwarfengDctKafka 桥接 Flat 被过滤数据。 + * + * @author mooyuan + * @since 2.0.0 + */ +public class DwarfengDctKafkaBridgeFilteredData implements Dto, Data { + + private static final long serialVersionUID = -252359964434537797L; + private LongIdKey pointKey; + private LongIdKey filterKey; + private String value; + private String message; + private Date happenedDate; + + public DwarfengDctKafkaBridgeFilteredData() { + } + + public DwarfengDctKafkaBridgeFilteredData(LongIdKey pointKey, LongIdKey filterKey, String value, String message, Date happenedDate) { + this.pointKey = pointKey; + this.filterKey = filterKey; + this.value = value; + this.message = message; + this.happenedDate = happenedDate; + } + + @NotNull + public LongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(LongIdKey pointKey) { + this.pointKey = pointKey; + } + + public LongIdKey getFilterKey() { + return filterKey; + } + + public void setFilterKey(LongIdKey filterKey) { + this.filterKey = filterKey; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @NotNull + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "DwarfengDctKafkaBridgeFlatFilteredData{" + + "pointKey=" + pointKey + + ", filterKey=" + filterKey + + ", value='" + value + '\'' + + ", message='" + message + '\'' + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeNormalData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeNormalData.java new file mode 100644 index 00000000..906bc7e8 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeNormalData.java @@ -0,0 +1,67 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean; + +import com.dwarfeng.fdr.stack.struct.Data; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; +import com.dwarfeng.subgrade.stack.bean.key.LongIdKey; +import org.jetbrains.annotations.NotNull; + +import java.util.Date; + +/** + * DwarfengDctKafka 桥接 Flat 一般数据。 + * + * @author mooyuan + * @since 2.0.0 + */ +public class DwarfengDctKafkaBridgeNormalData implements Dto, Data { + + private static final long serialVersionUID = 1258361883501890956L; + + private LongIdKey pointKey; + private String value; + private Date happenedDate; + + public DwarfengDctKafkaBridgeNormalData() { + } + + public DwarfengDctKafkaBridgeNormalData(LongIdKey pointKey, String value, Date happenedDate) { + this.pointKey = pointKey; + this.value = value; + this.happenedDate = happenedDate; + } + + @NotNull + public LongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(LongIdKey pointKey) { + this.pointKey = pointKey; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + @NotNull + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "DwarfengDctKafkaBridgeFlatNormalData{" + + "pointKey=" + pointKey + + ", value='" + value + '\'' + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeTriggeredData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeTriggeredData.java new file mode 100644 index 00000000..74977f36 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/dwarfengDctKafka/bean/DwarfengDctKafkaBridgeTriggeredData.java @@ -0,0 +1,90 @@ +package com.dwarfeng.fdr.impl.handler.bridge.dwarfengDctKafka.bean; + +import com.dwarfeng.fdr.stack.struct.Data; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; +import com.dwarfeng.subgrade.stack.bean.key.LongIdKey; +import org.jetbrains.annotations.NotNull; + +import java.util.Date; + +/** + * DwarfengDctKafka 桥接 Flat 被触发数据。 + * + * @author mooyuan + * @since 2.0.0 + */ +public class DwarfengDctKafkaBridgeTriggeredData implements Dto, Data { + + private static final long serialVersionUID = 3205140193103991311L; + + private LongIdKey pointKey; + private LongIdKey triggerKey; + private String value; + private String message; + private Date happenedDate; + + + public DwarfengDctKafkaBridgeTriggeredData() { + } + + public DwarfengDctKafkaBridgeTriggeredData(LongIdKey pointKey, LongIdKey triggerKey, String value, String message, Date happenedDate) { + this.pointKey = pointKey; + this.triggerKey = triggerKey; + this.value = value; + this.message = message; + this.happenedDate = happenedDate; + } + + @NotNull + public LongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(LongIdKey pointKey) { + this.pointKey = pointKey; + } + + public LongIdKey getTriggerKey() { + return triggerKey; + } + + public void setTriggerKey(LongIdKey triggerKey) { + this.triggerKey = triggerKey; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @NotNull + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "DwarfengDctKafkaBridgeFlatTriggeredData{" + + "pointKey=" + pointKey + + ", triggerKey=" + triggerKey + + ", value='" + value + '\'' + + ", message='" + message + '\'' + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridge.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridge.java new file mode 100644 index 00000000..4298ead8 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridge.java @@ -0,0 +1,86 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka; + +import com.dwarfeng.fdr.impl.handler.bridge.FullBridge; +import com.dwarfeng.fdr.stack.bean.dto.FilteredData; +import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; +import org.springframework.stereotype.Component; + +/** + * NativeKafka 桥接器。 + * + * @author mooyuan + * @since 2.0.4 + */ +@Component +public class NativeKafkaBridge extends FullBridge { + + public static final String BRIDGE_TYPE = "kafka.native"; + + private final NativeKafkaBridgeNormalDataKeeper normalDataKeeper; + private final NativeKafkaBridgeFilteredDataKeeper filteredDataKeeper; + private final NativeKafkaBridgeTriggeredDataKeeper triggeredDataKeeper; + private final NativeKafkaBridgeNormalDataPersister normalDataPersister; + private final NativeKafkaBridgeFilteredDataPersister filteredDataPersister; + private final NativeKafkaBridgeTriggeredDataPersister triggeredDataPersister; + + public NativeKafkaBridge( + NativeKafkaBridgeNormalDataKeeper normalDataKeeper, + NativeKafkaBridgeFilteredDataKeeper filteredDataKeeper, + NativeKafkaBridgeTriggeredDataKeeper triggeredDataKeeper, + NativeKafkaBridgeNormalDataPersister normalDataPersister, + NativeKafkaBridgeFilteredDataPersister filteredDataPersister, + NativeKafkaBridgeTriggeredDataPersister triggeredDataPersister + ) { + super(BRIDGE_TYPE); + this.normalDataKeeper = normalDataKeeper; + this.filteredDataKeeper = filteredDataKeeper; + this.triggeredDataKeeper = triggeredDataKeeper; + this.normalDataPersister = normalDataPersister; + this.filteredDataPersister = filteredDataPersister; + this.triggeredDataPersister = triggeredDataPersister; + } + + @Override + public Keeper getNormalDataKeeper() { + return normalDataKeeper; + } + + @Override + public Keeper getFilteredDataKeeper() { + return filteredDataKeeper; + } + + @Override + public Keeper getTriggeredDataKeeper() { + return triggeredDataKeeper; + } + + @Override + public Persister getNormalDataPersister() { + return normalDataPersister; + } + + @Override + public Persister getFilteredDataPersister() { + return filteredDataPersister; + } + + @Override + public Persister getTriggeredDataPersister() { + return triggeredDataPersister; + } + + @Override + public String toString() { + return "NativeKafkaBridge{" + + "normalDataKeeper=" + normalDataKeeper + + ", filteredDataKeeper=" + filteredDataKeeper + + ", triggeredDataKeeper=" + triggeredDataKeeper + + ", normalDataPersister=" + normalDataPersister + + ", filteredDataPersister=" + filteredDataPersister + + ", triggeredDataPersister=" + triggeredDataPersister + + ", bridgeType='" + bridgeType + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeFilteredDataKeeper.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeFilteredDataKeeper.java new file mode 100644 index 00000000..ac17eb64 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeFilteredDataKeeper.java @@ -0,0 +1,63 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeFastJsonFilteredData; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeFilteredData; +import com.dwarfeng.fdr.stack.bean.dto.FilteredData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * NativeKafka 桥接器被过滤数据保持器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeFilteredDataKeeper extends NativeKafkaBridgeKeeper { + + @Value("${bridge.kafka.native.topic.filtered_updated}") + private String filteredUpdatedTopic; + + public NativeKafkaBridgeFilteredDataKeeper(KafkaTemplate kafkaTemplate) { + super(kafkaTemplate); + } + + @Override + protected void doUpdate(FilteredData data) throws Exception { + NativeKafkaBridgeFilteredData filteredData = transform(data); + String message = JSON.toJSONString(NativeKafkaBridgeFastJsonFilteredData.of(filteredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(filteredUpdatedTopic, message); + } + + @Override + protected NativeKafkaBridgeFilteredData transform(FilteredData data) throws Exception { + return new NativeKafkaBridgeFilteredData( + data.getPointKey(), + data.getFilterKey(), + data.getValue(), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected FilteredData of(NativeKafkaBridgeFilteredData data) throws Exception { + return new FilteredData( + data.getPointKey(), + data.getFilterKey(), + data.getValue(), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected void doUpdate(List datas) throws Exception { + for (FilteredData data : datas) { + doUpdate(data); + } + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeFilteredDataPersister.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeFilteredDataPersister.java new file mode 100644 index 00000000..0740d2ab --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeFilteredDataPersister.java @@ -0,0 +1,63 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeFastJsonFilteredData; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeFilteredData; +import com.dwarfeng.fdr.stack.bean.dto.FilteredData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * NativeKafka 桥接器被过滤数据持久器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeFilteredDataPersister extends NativeKafkaBridgePersister { + + @Value("${bridge.kafka.native.topic.filtered_recorded}") + private String filteredRecordedTopic; + + public NativeKafkaBridgeFilteredDataPersister(KafkaTemplate kafkaTemplate) { + super(kafkaTemplate); + } + + @Override + protected void doRecord(FilteredData data) throws Exception { + NativeKafkaBridgeFilteredData filteredData = transform(data); + String message = JSON.toJSONString(NativeKafkaBridgeFastJsonFilteredData.of(filteredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(filteredRecordedTopic, message); + } + + @Override + protected void doRecord(List datas) throws Exception { + for (FilteredData data : datas) { + doRecord(data); + } + } + + @Override + protected NativeKafkaBridgeFilteredData transform(FilteredData data) throws Exception { + return new NativeKafkaBridgeFilteredData( + data.getPointKey(), + data.getFilterKey(), + data.getValue(), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected FilteredData of(NativeKafkaBridgeFilteredData data) throws Exception { + return new FilteredData( + data.getPointKey(), + data.getFilterKey(), + data.getValue(), + data.getMessage(), + data.getHappenedDate() + ); + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeKeeper.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeKeeper.java new file mode 100644 index 00000000..0af55d34 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeKeeper.java @@ -0,0 +1,25 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka; + +import com.dwarfeng.fdr.impl.handler.bridge.WriteOnlyKeeper; +import com.dwarfeng.fdr.stack.struct.Data; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.kafka.core.KafkaTemplate; + +/** + * NativeKafka 桥接器保持器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public abstract class NativeKafkaBridgeKeeper extends WriteOnlyKeeper { + + protected KafkaTemplate kafkaTemplate; + + public NativeKafkaBridgeKeeper(@Qualifier("nativeKafkaPusher.kafkaTemplate") KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + protected abstract T transform(D data) throws Exception; + + protected abstract D of(T data) throws Exception; +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeNormalDataKeeper.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeNormalDataKeeper.java new file mode 100644 index 00000000..7789d8af --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeNormalDataKeeper.java @@ -0,0 +1,59 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeFastJsonNormalData; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeNormalData; +import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * NativeKafka 桥接器一般数据保持器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeNormalDataKeeper extends NativeKafkaBridgeKeeper { + + @Value("${bridge.kafka.native.topic.normal_updated}") + private String normalUpdatedTopic; + + public NativeKafkaBridgeNormalDataKeeper(KafkaTemplate kafkaTemplate) { + super(kafkaTemplate); + } + + @Override + protected void doUpdate(NormalData data) throws Exception { + NativeKafkaBridgeNormalData normalData = transform(data); + String message = JSON.toJSONString(NativeKafkaBridgeFastJsonNormalData.of(normalData), SerializerFeature.WriteClassName); + kafkaTemplate.send(normalUpdatedTopic, message); + } + + @Override + protected NativeKafkaBridgeNormalData transform(NormalData data) throws Exception { + return new NativeKafkaBridgeNormalData( + data.getPointKey(), + data.getValue(), + data.getHappenedDate() + ); + } + + @Override + protected NormalData of(NativeKafkaBridgeNormalData data) throws Exception { + return new NormalData( + data.getPointKey(), + data.getValue(), + data.getHappenedDate() + ); + } + + @Override + protected void doUpdate(List datas) throws Exception { + for (NormalData data : datas) { + doUpdate(data); + } + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeNormalDataPersister.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeNormalDataPersister.java new file mode 100644 index 00000000..c28bbf60 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeNormalDataPersister.java @@ -0,0 +1,59 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeFastJsonNormalData; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeNormalData; +import com.dwarfeng.fdr.stack.bean.dto.NormalData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * NativeKafka 桥接器一般数据持久器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeNormalDataPersister extends NativeKafkaBridgePersister { + + @Value("${bridge.kafka.native.topic.normal_recorded}") + private String normalRecordedTopic; + + public NativeKafkaBridgeNormalDataPersister(KafkaTemplate kafkaTemplate) { + super(kafkaTemplate); + } + + @Override + protected void doRecord(NormalData data) throws Exception { + NativeKafkaBridgeNormalData normalData = transform(data); + String message = JSON.toJSONString(NativeKafkaBridgeFastJsonNormalData.of(normalData), SerializerFeature.WriteClassName); + kafkaTemplate.send(normalRecordedTopic, message); + } + + @Override + protected void doRecord(List datas) throws Exception { + for(NormalData data : datas){ + doRecord(data); + } + } + + @Override + protected NativeKafkaBridgeNormalData transform(NormalData data) throws Exception { + return new NativeKafkaBridgeNormalData( + data.getPointKey(), + data.getValue(), + data.getHappenedDate() + ); + } + + @Override + protected NormalData of(NativeKafkaBridgeNormalData data) throws Exception { + return new NormalData( + data.getPointKey(), + data.getValue(), + data.getHappenedDate() + ); + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgePersister.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgePersister.java new file mode 100644 index 00000000..afd709eb --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgePersister.java @@ -0,0 +1,25 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka; + +import com.dwarfeng.fdr.impl.handler.bridge.WriteOnlyPersister; +import com.dwarfeng.fdr.stack.struct.Data; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.kafka.core.KafkaTemplate; + +/** + * NativeKafka 桥接器持久器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public abstract class NativeKafkaBridgePersister extends WriteOnlyPersister { + + protected KafkaTemplate kafkaTemplate; + + public NativeKafkaBridgePersister(@Qualifier("nativeKafkaPusher.kafkaTemplate") KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + protected abstract T transform(D data) throws Exception; + + protected abstract D of(T data) throws Exception; +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeTriggeredDataKeeper.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeTriggeredDataKeeper.java new file mode 100644 index 00000000..136c9bb8 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeTriggeredDataKeeper.java @@ -0,0 +1,63 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeFastJsonTriggeredData; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeTriggeredData; +import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * NativeKafka 桥接器被触发数据保持器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeTriggeredDataKeeper extends NativeKafkaBridgeKeeper { + + @Value("${bridge.kafka.native.topic.triggered_updated}") + private String triggeredUpdatedTopic; + + public NativeKafkaBridgeTriggeredDataKeeper(KafkaTemplate kafkaTemplate) { + super(kafkaTemplate); + } + + @Override + protected void doUpdate(TriggeredData data) throws Exception { + NativeKafkaBridgeTriggeredData triggeredData = transform(data); + String message = JSON.toJSONString(NativeKafkaBridgeFastJsonTriggeredData.of(triggeredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(triggeredUpdatedTopic, message); + } + + @Override + protected NativeKafkaBridgeTriggeredData transform(TriggeredData data) throws Exception { + return new NativeKafkaBridgeTriggeredData( + data.getPointKey(), + data.getTriggerKey(), + data.getValue(), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected TriggeredData of(NativeKafkaBridgeTriggeredData data) throws Exception { + return new TriggeredData( + data.getPointKey(), + data.getTriggerKey(), + data.getValue(), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected void doUpdate(List datas) throws Exception { + for (TriggeredData data : datas) { + doUpdate(data); + } + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeTriggeredDataPersister.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeTriggeredDataPersister.java new file mode 100644 index 00000000..90f466fc --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/NativeKafkaBridgeTriggeredDataPersister.java @@ -0,0 +1,63 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeFastJsonTriggeredData; +import com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean.NativeKafkaBridgeTriggeredData; +import com.dwarfeng.fdr.stack.bean.dto.TriggeredData; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.List; + +/** + * NativeKafka 桥接器被触发数据持久器。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeTriggeredDataPersister extends NativeKafkaBridgePersister { + + @Value("${bridge.kafka.native.topic.triggered_recorded}") + private String triggeredRecordedTopic; + + public NativeKafkaBridgeTriggeredDataPersister(KafkaTemplate kafkaTemplate) { + super(kafkaTemplate); + } + + @Override + protected void doRecord(TriggeredData data) throws Exception { + NativeKafkaBridgeTriggeredData triggeredData = transform(data); + String message = JSON.toJSONString(NativeKafkaBridgeFastJsonTriggeredData.of(triggeredData), SerializerFeature.WriteClassName); + kafkaTemplate.send(triggeredRecordedTopic, message); + } + + @Override + protected void doRecord(List datas) throws Exception { + for (TriggeredData data: datas) { + doRecord(data); + } + } + + @Override + protected NativeKafkaBridgeTriggeredData transform(TriggeredData data) throws Exception { + return new NativeKafkaBridgeTriggeredData( + data.getPointKey(), + data.getTriggerKey(), + data.getValue(), + data.getMessage(), + data.getHappenedDate() + ); + } + + @Override + protected TriggeredData of(NativeKafkaBridgeTriggeredData data) throws Exception { + return new TriggeredData( + data.getPointKey(), + data.getTriggerKey(), + data.getValue(), + data.getMessage(), + data.getHappenedDate() + ); + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonFilteredData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonFilteredData.java new file mode 100644 index 00000000..59c90528 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonFilteredData.java @@ -0,0 +1,124 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean; + +import com.alibaba.fastjson.annotation.JSONField; +import com.dwarfeng.subgrade.sdk.bean.key.FastJsonLongIdKey; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; + +import java.util.Date; +import java.util.Objects; + +/** + * NativeKafka 桥接 FastJson 被过滤数据。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeFastJsonFilteredData implements Dto { + + private static final long serialVersionUID = 5409247508302162166L; + + public static NativeKafkaBridgeFastJsonFilteredData of(NativeKafkaBridgeFilteredData filteredData) { + if (Objects.isNull(filteredData)) { + return null; + } else { + return new NativeKafkaBridgeFastJsonFilteredData( + FastJsonLongIdKey.of(filteredData.getPointKey()), + FastJsonLongIdKey.of(filteredData.getFilterKey()), + filteredData.getValue(), + filteredData.getMessage(), + filteredData.getHappenedDate() + ); + } + } + + public static NativeKafkaBridgeFilteredData toStackBean(NativeKafkaBridgeFastJsonFilteredData fastFilteredData) { + if (Objects.isNull(fastFilteredData)) { + return null; + } else { + return new NativeKafkaBridgeFilteredData( + FastJsonLongIdKey.toStackBean(fastFilteredData.getPointKey()), + FastJsonLongIdKey.toStackBean(fastFilteredData.getFilterKey()), + fastFilteredData.getValue(), + fastFilteredData.getMessage(), + fastFilteredData.getHappenedDate() + ); + } + } + + @JSONField(name = "point_key", ordinal = 1) + private FastJsonLongIdKey pointKey; + + @JSONField(name = "filter_key", ordinal = 2) + private FastJsonLongIdKey filterKey; + + @JSONField(name = "value", ordinal = 3) + private Object value; + + @JSONField(name = "message", ordinal = 4) + private String message; + + @JSONField(name = "happened_date", ordinal = 5) + private Date happenedDate; + + public NativeKafkaBridgeFastJsonFilteredData() { + } + + public NativeKafkaBridgeFastJsonFilteredData(FastJsonLongIdKey pointKey, FastJsonLongIdKey filterKey, Object value, String message, Date happenedDate) { + this.pointKey = pointKey; + this.filterKey = filterKey; + this.value = value; + this.message = message; + this.happenedDate = happenedDate; + } + + public FastJsonLongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(FastJsonLongIdKey pointKey) { + this.pointKey = pointKey; + } + + public FastJsonLongIdKey getFilterKey() { + return filterKey; + } + + public void setFilterKey(FastJsonLongIdKey filterKey) { + this.filterKey = filterKey; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "NativeKafkaBridgeFastJsonFilteredData{" + + "pointKey=" + pointKey + + ", filterKey=" + filterKey + + ", value=" + value + + ", message='" + message + '\'' + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonNormalData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonNormalData.java new file mode 100644 index 00000000..287c6a2d --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonNormalData.java @@ -0,0 +1,94 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean; + +import com.alibaba.fastjson.annotation.JSONField; +import com.dwarfeng.subgrade.sdk.bean.key.FastJsonLongIdKey; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; + +import java.util.Date; +import java.util.Objects; + +/** + * NativeKafka 桥接 FastJson 一般数据。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeFastJsonNormalData implements Dto { + + private static final long serialVersionUID = 8002615279384102341L; + + public static NativeKafkaBridgeFastJsonNormalData of(NativeKafkaBridgeNormalData normalData) { + if (Objects.isNull(normalData)) { + return null; + } else { + return new NativeKafkaBridgeFastJsonNormalData( + FastJsonLongIdKey.of(normalData.getPointKey()), + normalData.getValue(), + normalData.getHappenedDate() + ); + } + } + + public static NativeKafkaBridgeNormalData toStackBean(NativeKafkaBridgeFastJsonNormalData fastNormalData) { + if (Objects.isNull(fastNormalData)) { + return null; + } else { + return new NativeKafkaBridgeNormalData( + FastJsonLongIdKey.toStackBean(fastNormalData.getPointKey()), + fastNormalData.getValue(), + fastNormalData.getHappenedDate() + ); + } + } + + @JSONField(name = "point_key", ordinal = 1) + private FastJsonLongIdKey pointKey; + + @JSONField(name = "value", ordinal = 2) + private Object value; + + @JSONField(name = "happened_date", ordinal = 3) + private Date happenedDate; + + public NativeKafkaBridgeFastJsonNormalData() { + } + + public NativeKafkaBridgeFastJsonNormalData(FastJsonLongIdKey pointKey, Object value, Date happenedDate) { + this.pointKey = pointKey; + this.value = value; + this.happenedDate = happenedDate; + } + + public FastJsonLongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(FastJsonLongIdKey pointKey) { + this.pointKey = pointKey; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "FastJsonNormalData{" + + "pointKey=" + pointKey + + ", value=" + value + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonTriggeredData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonTriggeredData.java new file mode 100644 index 00000000..1449806b --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFastJsonTriggeredData.java @@ -0,0 +1,126 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean; + +import com.alibaba.fastjson.annotation.JSONField; +import com.dwarfeng.subgrade.sdk.bean.key.FastJsonLongIdKey; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; + +import java.util.Date; +import java.util.Objects; + +/** + * NativeKafka 桥接 FastJson 被触发数据。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeFastJsonTriggeredData implements Dto { + + private static final long serialVersionUID = -542632850406508846L; + + public static NativeKafkaBridgeFastJsonTriggeredData of(NativeKafkaBridgeTriggeredData triggeredData) { + if (Objects.isNull(triggeredData)) { + return null; + } else { + return new NativeKafkaBridgeFastJsonTriggeredData( + FastJsonLongIdKey.of(triggeredData.getPointKey()), + FastJsonLongIdKey.of(triggeredData.getTriggerKey()), + triggeredData.getValue(), + triggeredData.getMessage(), + triggeredData.getHappenedDate() + ); + } + } + + public static NativeKafkaBridgeTriggeredData toStackBean(NativeKafkaBridgeFastJsonTriggeredData fastTriggeredData) { + if (Objects.isNull(fastTriggeredData)) { + return null; + } else { + return new NativeKafkaBridgeTriggeredData( + FastJsonLongIdKey.toStackBean(fastTriggeredData.getPointKey()), + FastJsonLongIdKey.toStackBean(fastTriggeredData.getTriggerKey()), + fastTriggeredData.getValue(), + fastTriggeredData.getMessage(), + fastTriggeredData.getHappenedDate() + ); + } + } + + @JSONField(name = "point_key", ordinal = 1) + private FastJsonLongIdKey pointKey; + + @JSONField(name = "trigger_key", ordinal = 2) + private FastJsonLongIdKey triggerKey; + + @JSONField(name = "value", ordinal = 3) + private Object value; + + @JSONField(name = "message", ordinal = 4) + private String message; + + @JSONField(name = "happened_date", ordinal = 5) + private Date happenedDate; + + public NativeKafkaBridgeFastJsonTriggeredData() { + } + + public NativeKafkaBridgeFastJsonTriggeredData( + FastJsonLongIdKey pointKey, FastJsonLongIdKey triggerKey, Object value, String message, Date happenedDate + ) { + this.pointKey = pointKey; + this.triggerKey = triggerKey; + this.value = value; + this.message = message; + this.happenedDate = happenedDate; + } + + public FastJsonLongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(FastJsonLongIdKey pointKey) { + this.pointKey = pointKey; + } + + public FastJsonLongIdKey getTriggerKey() { + return triggerKey; + } + + public void setTriggerKey(FastJsonLongIdKey triggerKey) { + this.triggerKey = triggerKey; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "FastJsonTriggeredData{" + + "pointKey=" + pointKey + + ", triggerKey=" + triggerKey + + ", value=" + value + + ", message='" + message + '\'' + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFilteredData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFilteredData.java new file mode 100644 index 00000000..4e342564 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeFilteredData.java @@ -0,0 +1,90 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean; + +import com.dwarfeng.fdr.stack.struct.Data; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; +import com.dwarfeng.subgrade.stack.bean.key.LongIdKey; +import org.jetbrains.annotations.NotNull; + +import java.util.Date; +/** + * NativeKafka 桥接被过滤数据。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeFilteredData implements Data, Dto { + + private static final long serialVersionUID = 7757526801506627437L; + + private LongIdKey pointKey; + private LongIdKey filterKey; + private Object value; + private String message; + private Date happenedDate; + + public NativeKafkaBridgeFilteredData() { + } + + public NativeKafkaBridgeFilteredData(LongIdKey pointKey, LongIdKey filterKey, Object value, String message, Date happenedDate) { + this.pointKey = pointKey; + this.filterKey = filterKey; + this.value = value; + this.message = message; + this.happenedDate = happenedDate; + } + + @NotNull + @Override + public LongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(LongIdKey pointKey) { + this.pointKey = pointKey; + } + + public LongIdKey getFilterKey() { + return filterKey; + } + + public void setFilterKey(LongIdKey filterKey) { + this.filterKey = filterKey; + } + + @Override + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @NotNull + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "NativeKafkaBridgeFilteredData{" + + "pointKey=" + pointKey + + ", filterKey=" + filterKey + + ", value='" + value + '\'' + + ", message='" + message + '\'' + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeNormalData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeNormalData.java new file mode 100644 index 00000000..2cc298b3 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeNormalData.java @@ -0,0 +1,70 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean; + +import com.dwarfeng.fdr.stack.struct.Data; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; +import com.dwarfeng.subgrade.stack.bean.key.LongIdKey; + +import javax.annotation.Nonnull; +import java.util.Date; + +/** + * NativeKafka 桥接一般数据。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeNormalData implements Data, Dto { + + private static final long serialVersionUID = 1845865111204763158L; + + private LongIdKey pointKey; + private Object value; + private Date happenedDate; + + public NativeKafkaBridgeNormalData() { + } + + public NativeKafkaBridgeNormalData(LongIdKey pointKey, Object value, Date happenedDate) { + this.pointKey = pointKey; + this.value = value; + this.happenedDate = happenedDate; + } + + @Nonnull + @Override + public LongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(LongIdKey pointKey) { + this.pointKey = pointKey; + } + + @Override + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + @Nonnull + @Override + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "NormalData{" + + "pointKey=" + pointKey + + ", value=" + value + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeTriggeredData.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeTriggeredData.java new file mode 100644 index 00000000..a481c1f4 --- /dev/null +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/nativeKafka/bean/NativeKafkaBridgeTriggeredData.java @@ -0,0 +1,94 @@ +package com.dwarfeng.fdr.impl.handler.bridge.nativeKafka.bean; + +import com.dwarfeng.fdr.stack.struct.Data; +import com.dwarfeng.subgrade.stack.bean.dto.Dto; +import com.dwarfeng.subgrade.stack.bean.key.LongIdKey; + +import javax.annotation.Nonnull; +import java.util.Date; + +/** + * NativeKafka 桥接被触发数据。 + * + * @author mooyuan + * @since 2.0.4 + */ +public class NativeKafkaBridgeTriggeredData implements Data, Dto { + + private static final long serialVersionUID = 3586213330046421100L; + + private LongIdKey pointKey; + private LongIdKey triggerKey; + private Object value; + private String message; + private Date happenedDate; + + public NativeKafkaBridgeTriggeredData() { + } + + public NativeKafkaBridgeTriggeredData( + LongIdKey pointKey, LongIdKey triggerKey, Object value, String message, Date happenedDate + ) { + this.pointKey = pointKey; + this.triggerKey = triggerKey; + this.value = value; + this.message = message; + this.happenedDate = happenedDate; + } + + @Nonnull + @Override + public LongIdKey getPointKey() { + return pointKey; + } + + public void setPointKey(LongIdKey pointKey) { + this.pointKey = pointKey; + } + + public LongIdKey getTriggerKey() { + return triggerKey; + } + + public void setTriggerKey(LongIdKey triggerKey) { + this.triggerKey = triggerKey; + } + + @Override + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Nonnull + @Override + public Date getHappenedDate() { + return happenedDate; + } + + public void setHappenedDate(Date happenedDate) { + this.happenedDate = happenedDate; + } + + @Override + public String toString() { + return "TriggeredData{" + + "pointKey=" + pointKey + + ", triggerKey=" + triggerKey + + ", value=" + value + + ", message='" + message + '\'' + + ", happenedDate=" + happenedDate + + '}'; + } +} \ No newline at end of file diff --git a/fdr-node/src/main/resources/fdr/bridge.properties b/fdr-node/src/main/resources/fdr/bridge.properties index ec2caea8..e0af25e3 100644 --- a/fdr-node/src/main/resources/fdr/bridge.properties +++ b/fdr-node/src/main/resources/fdr/bridge.properties @@ -181,6 +181,64 @@ bridge.influxdb.bucket.triggered_data=fdr.triggered_data bridge.influxdb.organization=com.dwarfeng # ################################################### +# kafka.native # +################################################### +# broker\u96C6\u7FA4\u3002 +bridge.kafka.native.bootstrap_servers=your ip here like ip1:9092,ip2:9092,ip3:9092 +# \u8FDE\u63A5\u5C5E\u6027\u3002 +bridge.kafka.native.acks=all +# \u53D1\u9001\u5931\u8D25\u91CD\u8BD5\u6B21\u6570\u3002 +bridge.kafka.native.retries=3 +bridge.kafka.native.linger=10 +# \u7684\u6279\u5904\u7406\u7F13\u51B2\u533A\u5927\u5C0F\u3002 +bridge.kafka.native.buffer_memory=40960 +# \u6279\u5904\u7406\u6761\u6570\uFF1A\u5F53\u591A\u4E2A\u8BB0\u5F55\u88AB\u53D1\u9001\u5230\u540C\u4E00\u4E2A\u5206\u533A\u65F6\uFF0C\u751F\u4EA7\u8005\u4F1A\u5C1D\u8BD5\u5C06\u8BB0\u5F55\u5408\u5E76\u5230\u66F4\u5C11\u7684\u8BF7\u6C42\u4E2D\u3002\u8FD9\u6709\u52A9\u4E8E\u5BA2\u6237\u7AEF\u548C\u670D\u52A1\u5668\u7684\u6027\u80FD\u3002 +bridge.kafka.native.batch_size=4096 +# Kafka\u4E8B\u52A1\u7684\u524D\u7F00\u3002 +bridge.kafka.native.transaction_prefix=fdr.bridge. +# \u4E00\u822C\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.native.topic.normal_updated=fdr.bridge.normal_updated +# \u4E00\u822C\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.native.topic.normal_recorded=fdr.bridge.normal_recorded +# \u88AB\u8FC7\u6EE4\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.native.topic.filtered_updated=fdr.bridge.filtered_updated +# \u88AB\u8FC7\u6EE4\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.native.topic.filtered_recorded=fdr.bridge.filtered_recorded +# \u88AB\u89E6\u53D1\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.native.topic.triggered_updated=fdr.bridge.triggered_updated +# \u88AB\u89E6\u53D1\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.native.topic.triggered_recorded=fdr.bridge.triggered_recorded +# +######################################################## +# kafka.dwarfengDct # +######################################################## +# broker\u96C6\u7FA4\u3002 +bridge.kafka.dwarfengDct.bootstrap_servers=your ip here like ip1:9092,ip2:9092,ip3:9092 +# \u8FDE\u63A5\u5C5E\u6027\u3002 +bridge.kafka.dwarfengDct.acks=all +# \u53D1\u9001\u5931\u8D25\u91CD\u8BD5\u6B21\u6570\u3002 +bridge.kafka.dwarfengDct.retries=3 +bridge.kafka.dwarfengDct.linger=10 +# \u7684\u6279\u5904\u7406\u7F13\u51B2\u533A\u5927\u5C0F\u3002 +bridge.kafka.dwarfengDct.buffer_memory=40960 +# \u6279\u5904\u7406\u6761\u6570\uFF1A\u5F53\u591A\u4E2A\u8BB0\u5F55\u88AB\u53D1\u9001\u5230\u540C\u4E00\u4E2A\u5206\u533A\u65F6\uFF0C\u751F\u4EA7\u8005\u4F1A\u5C1D\u8BD5\u5C06\u8BB0\u5F55\u5408\u5E76\u5230\u66F4\u5C11\u7684\u8BF7\u6C42\u4E2D\u3002\u8FD9\u6709\u52A9\u4E8E\u5BA2\u6237\u7AEF\u548C\u670D\u52A1\u5668\u7684\u6027\u80FD\u3002 +bridge.kafka.dwarfengDct.batch_size=4096 +# Kafka\u4E8B\u52A1\u7684\u524D\u7F00\u3002 +bridge.kafka.dwarfengDct.transaction_prefix=fdr.bridge. +# \u4E00\u822C\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.dwarfengDct.topic.normal_updated=fdr.bridge.normal_updated +# \u4E00\u822C\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.dwarfengDct.topic.normal_recorded=fdr.bridge.normal_recorded +# \u88AB\u8FC7\u6EE4\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.dwarfengDct.topic.filtered_updated=fdr.bridge.filtered_updated +# \u88AB\u8FC7\u6EE4\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.dwarfengDct.topic.filtered_recorded=fdr.bridge.filtered_recorded +# \u88AB\u89E6\u53D1\u6570\u636E\u66F4\u65B0\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.dwarfengDct.topic.triggered_updated=fdr.bridge.triggered_updated +# \u88AB\u89E6\u53D1\u6570\u636E\u8BB0\u5F55\u65F6\u5411 Kafka \u53D1\u9001\u7684\u4E3B\u9898\u3002 +bridge.kafka.dwarfengDct.topic.triggered_recorded=fdr.bridge.triggered_recorded +# +################################################### # multi # ################################################### # \u591A\u91CD\u6865\u63A5\u5668\u672C\u8EAB\u4E0D\u76F4\u63A5\u5B9E\u73B0\u6570\u636E\u7684\u6865\u63A5\u65B9\u6CD5\uFF0C\u800C\u662F\u901A\u8FC7\u4EE3\u7406\u7684\u65B9\u5F0F\u5B9E\u73B0\u3002\u6570\u636E\u5199\u5165\u591A\u91CD\u6865\u63A5\u5668\u65F6\uFF0C -- Gitee