diff --git a/CHANGELOG.md b/CHANGELOG.md index 6eb1b579a13d9ca5f0582fade73099443cbab7b9..635fd2909dd49c1cb02fbe3b4eeb179419d61d67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ #### 功能构建 -- (无) +- Wiki 更新。 + - 优化 `Contents.md` 中的内容。 #### Bug修复 diff --git a/docs/wiki/zh_CN/Contents.md b/docs/wiki/zh_CN/Contents.md index cec790e5565187705ae4a6db789477ab77111fb0..c97b11086c250b75d3f190afd12dad6b2f798127 100644 --- a/docs/wiki/zh_CN/Contents.md +++ b/docs/wiki/zh_CN/Contents.md @@ -43,7 +43,6 @@ - [Source](./Source.md) - 数据源,详细说明了本项目的数据源机制。 - [PresetSourceImplements.md](./PresetSourceImplements.md) - 预设数据源实现,详细说明了本项目内置的所有数据源。 - - [Washer](./Washer.md) - 清洗器,详细说明了本项目的清洗器机制。 ## 维护与调试 diff --git a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/multi/MultiBridgePersister.java b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/multi/MultiBridgePersister.java index 4a0c8b77f4156847994c10390eae4bfc712af6c8..c7b8893bfc00013af08db0722a216edf9c9f90be 100644 --- a/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/multi/MultiBridgePersister.java +++ b/fdr-impl/src/main/java/com/dwarfeng/fdr/impl/handler/bridge/multi/MultiBridgePersister.java @@ -18,6 +18,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -52,8 +53,7 @@ public abstract class MultiBridgePersister extends AbstractPersi // 基于配置获取桥接器的代理列表。 List delegateBridges = new ArrayList<>(); for (String bridgeType : bridgeTypes) { - Bridge bridge = bridges.stream().filter(b -> b.supportType(bridgeType)).findAny() - .orElseThrow(() -> new HandlerException("未知的 bridge 类型: " + bridgeType)); + Bridge bridge = bridges.stream().filter(b -> b.supportType(bridgeType)).findAny().orElseThrow(() -> new HandlerException("未知的 bridge 类型: " + bridgeType)); if (!bridge.supportPersister()) { throw new IllegalStateException("桥接器不支持持久器, 请检查 bridge.properties 配置文件: " + bridgeType); } @@ -83,10 +83,7 @@ public abstract class MultiBridgePersister extends AbstractPersi // 对代理的所有持久器执行异步更新操作。 List> futures = new ArrayList<>(delegatePersisters.size()); for (final Persister delegatePersister : delegatePersisters) { - CompletableFuture future = CompletableFuture.runAsync( - () -> wrappedDoRecord(delegatePersister, data), - executor - ); + CompletableFuture future = CompletableFuture.runAsync(() -> wrappedDoRecord(delegatePersister, data), executor); futures.add(future); } try { @@ -119,10 +116,7 @@ public abstract class MultiBridgePersister extends AbstractPersi // 对代理的所有持久器执行异步更新操作。 List> futures = new ArrayList<>(delegatePersisters.size()); for (final Persister delegatePersister : delegatePersisters) { - CompletableFuture future = CompletableFuture.runAsync( - () -> wrappedDoRecord(delegatePersister, datas), - executor - ); + CompletableFuture future = CompletableFuture.runAsync(() -> wrappedDoRecord(delegatePersister, datas), executor); futures.add(future); } try { @@ -142,15 +136,35 @@ public abstract class MultiBridgePersister extends AbstractPersi delegatePersister.record(datas); } catch (Exception e) { Logger logger = getLogger(); - String message = "持久器 " + delegatePersister + " 记录数据时发生异常, 共 " + - datas.size() + " 条数据, 异常信息如下"; + String message = "持久器 " + delegatePersister + " 记录数据时发生异常, 共 " + datas.size() + " 条数据, 异常信息如下"; logger.warn(message, e); throw new CompletionException(e); } } + @SuppressWarnings("DuplicatedCode") @Override public LookupResult lookup(LookupInfo lookupInfo) throws HandlerException { + + String preset = lookupInfo.getPreset(); + + if (!preset.contains(":") || preset.split(",").length != 2) { + throw new IllegalArgumentException("预设不合法: " + preset); + } + + int index; + try { + index = Integer.parseInt(preset.split(",")[0]); + } catch (ClassCastException e) { + throw new ClassCastException("预设不合法: " + preset); + } + if (delegatePersisters.size() - 1 < index) { + throw new IndexOutOfBoundsException("预设不合法: " + preset); + } + // 重新设置primaryPersister + this.primaryPersister = delegatePersisters.get(index); + lookupInfo.setPreset(preset.split(",")[1]); + if (primaryPersister.writeOnly()) { throw new LookupNotSupportedException(); } @@ -165,20 +179,66 @@ public abstract class MultiBridgePersister extends AbstractPersi @Override public List> lookup(List lookupInfos) throws HandlerException { - if (primaryPersister.writeOnly()) { - throw new LookupNotSupportedException(); + ArrayList> LookupResults = new ArrayList<>(lookupInfos.size()); + + for (int i = 0; i < lookupInfos.size(); i++) { + LookupResults.add(null); } try { - return primaryPersister.lookup(lookupInfos); + // 对代理的所有持久器执行异步查询操作。 + List> futures = new ArrayList<>(lookupInfos.size()); + + for (int i = 0; i < lookupInfos.size(); i++) { + final int index = i; + final LookupInfo lookupInfo = lookupInfos.get(index); + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + LookupResult lookupResult = lookup(lookupInfo); + LookupResults.set(index, lookupResult); + } catch (HandlerException e) { + throw new CompletionException(e); + } + }, executor); + futures.add(future); + } + + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } catch (CompletionException e) { + throw (Exception) e.getCause(); + } } catch (HandlerException e) { throw e; } catch (Exception e) { throw new HandlerException(e); } + + return LookupResults; } + @SuppressWarnings("DuplicatedCode") @Override public QueryResult nativeQuery(NativeQueryInfo queryInfo) throws HandlerException { + + String preset = queryInfo.getPreset(); + + if (Objects.isNull(preset) || !preset.contains(":") || preset.split(",").length != 2) { + throw new IllegalArgumentException("预设不合法: " + preset); + } + + int index; + try { + index = Integer.parseInt(preset.split(",")[0]); + } catch (ClassCastException e) { + throw new ClassCastException("预设不合法: " + preset); + } + if (delegatePersisters.size() - 1 < index) { + throw new IndexOutOfBoundsException("预设不合法: " + preset); + } + // 重新设置primaryPersister + this.primaryPersister = delegatePersisters.get(index); + queryInfo.setPreset(preset.split(",")[1]); + if (primaryPersister.writeOnly()) { throw new NativeQueryNotSupportedException(); } @@ -193,16 +253,42 @@ public abstract class MultiBridgePersister extends AbstractPersi @Override public List nativeQuery(List queryInfos) throws HandlerException { - if (primaryPersister.writeOnly()) { - throw new NativeQueryNotSupportedException(); + List queryResults = new ArrayList<>(queryInfos.size()); + + for (int i = 0; i < queryInfos.size(); i++) { + queryResults.add(null); } + try { - return primaryPersister.nativeQuery(queryInfos); + // 对代理的所有持久器执行异步查询操作。 + List> futures = new ArrayList<>(queryInfos.size()); + + for (int i = 0; i < queryInfos.size(); i++) { + final int index = i; + final NativeQueryInfo queryInfo = queryInfos.get(index); + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + QueryResult queryResult = nativeQuery(queryInfo); + queryResults.set(index, queryResult); + } catch (HandlerException e) { + throw new CompletionException(e); + } + }, executor); + futures.add(future); + } + + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } catch (CompletionException e) { + throw (Exception) e.getCause(); + } } catch (HandlerException e) { throw e; } catch (Exception e) { throw new HandlerException(e); } + + return queryResults; } protected abstract String getDelegateConfig(); @@ -213,11 +299,6 @@ public abstract class MultiBridgePersister extends AbstractPersi @Override public String toString() { - return "MultiBridgePersister{" + - "ctx=" + ctx + - ", executor=" + executor + - ", primaryPersister=" + primaryPersister + - ", delegatePersisters=" + delegatePersisters + - '}'; + return "MultiBridgePersister{" + "ctx=" + ctx + ", executor=" + executor + ", primaryPersister=" + primaryPersister + ", delegatePersisters=" + delegatePersisters + '}'; } }