From 63dc52fc3dd3f75dc3be3cafac1bce32e087b7b6 Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Mon, 15 Jan 2024 18:54:13 +0800 Subject: [PATCH] =?UTF-8?q?chameleon=E5=8D=8F=E5=90=8C=E6=A0=A1=E9=AA=8C-?= =?UTF-8?q?=E6=8A=BD=E5=8F=96=E7=AB=AF=E5=A4=A7=E8=A1=A8=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E6=89=A9=E5=B1=95=E7=BA=BF=E7=A8=8B=E6=B1=A0;Topic=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E7=AE=A1=E7=90=86=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/DynamicThreadPoolManager.java | 24 ++++++++++++----- .../datachecker/extract/cache/TopicCache.java | 27 ++++++++++++------- .../extract/controller/ExtractController.java | 4 +-- .../service/DataExtractServiceImpl.java | 8 +++--- .../extract/slice/SliceDispatcher.java | 27 ++++++++++++------- .../extract/slice/SliceProcessorContext.java | 6 ++--- .../extract/slice/SliceRegister.java | 10 +++---- 7 files changed, 63 insertions(+), 43 deletions(-) diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/DynamicThreadPoolManager.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/DynamicThreadPoolManager.java index 2dc0793..c4c2a92 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/DynamicThreadPoolManager.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/DynamicThreadPoolManager.java @@ -21,6 +21,7 @@ import org.opengauss.datachecker.common.util.ThreadUtil; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; @@ -97,7 +98,7 @@ public class DynamicThreadPoolManager { /** * get Extend Free Thread Pool Executor * - * @param topicSize topicSize + * @param topicSize topicSize * @param extendMaxPoolSize extendMaxPoolSize * @return ExecutorService */ @@ -107,9 +108,9 @@ public class DynamicThreadPoolManager { return EXECUTOR_SERVICE_CACHE.get(freeDtpList.get(0)); } long count = EXECUTOR_SERVICE_CACHE.keySet() - .stream() - .filter(dtpName -> dtpName.startsWith(DynamicTpConstant.EXTEND_EXECUTOR)) - .count(); + .stream() + .filter(dtpName -> dtpName.startsWith(DynamicTpConstant.EXTEND_EXECUTOR)) + .count(); if (count < topicSize) { return buildExtendDtpExecutor(DynamicTpConstant.EXTEND_EXECUTOR + (count + 1), extendMaxPoolSize); } else { @@ -122,8 +123,8 @@ public class DynamicThreadPoolManager { List freeDtpList = new LinkedList<>(); EXECUTOR_SERVICE_CACHE.forEach((dtpName, dtpPool) -> { if (dtpName.startsWith(DynamicTpConstant.EXTEND_EXECUTOR) && dtpPool.getQueue() - .isEmpty() - && dtpPool.getActiveCount() == 0) { + .isEmpty() + && dtpPool.getActiveCount() == 0) { freeDtpList.add(dtpName); } }); @@ -134,4 +135,15 @@ public class DynamicThreadPoolManager { dynamicThreadPool.buildExtendDtpExecutor(EXECUTOR_SERVICE_CACHE, extendDtpName, 1, extendMaxPoolSize); return EXECUTOR_SERVICE_CACHE.get(extendDtpName); } + + public boolean allExecutorFree() { + List freeDtpList = new LinkedList<>(); + EXECUTOR_SERVICE_CACHE.forEach((dtpName, dtpPool) -> { + if (dtpPool.getQueue() + .isEmpty() && dtpPool.getActiveCount() == 0) { + freeDtpList.add(dtpName); + } + }); + return EXECUTOR_SERVICE_CACHE.size() == freeDtpList.size(); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TopicCache.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TopicCache.java index 61f6d5c..f244aa0 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TopicCache.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TopicCache.java @@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantLock; * @date :Created in 2023/4/23 * @since :11 */ -@Service public class TopicCache { private static final Map TOPIC_CACHE = new ConcurrentHashMap<>(); private static volatile Endpoint endpoint; @@ -42,7 +41,7 @@ public class TopicCache { * * @param currentEndpoint currentEndpoint */ - public void initEndpoint(Endpoint currentEndpoint) { + public static void initEndpoint(Endpoint currentEndpoint) { endpoint = currentEndpoint; } @@ -51,7 +50,7 @@ public class TopicCache { * * @param topic topic */ - public void add(Topic topic) { + public static void add(Topic topic) { lock.lock(); try { if (Objects.isNull(topic)) { @@ -74,11 +73,16 @@ public class TopicCache { * @param table table name * @return Topic */ - public Topic getTopic(String table) { - return TOPIC_CACHE.get(table); + public static Topic getTopic(String table) { + lock.lock(); + try { + return TOPIC_CACHE.get(table); + } finally { + lock.unlock(); + } } - public void removeTopic(String table) { + public static void removeTopic(String table) { lock.lock(); try { TOPIC_CACHE.remove(table); @@ -87,7 +91,7 @@ public class TopicCache { } } - public boolean canCreateTopic(int maxTopicNum) { + public static boolean canCreateTopic(int maxTopicNum) { lock.lock(); try { return maxTopicNum > TOPIC_CACHE.size(); @@ -96,7 +100,12 @@ public class TopicCache { } } - public int size() { - return TOPIC_CACHE.size(); + public static int size() { + lock.lock(); + try { + return TOPIC_CACHE.size(); + } finally { + lock.unlock(); + } } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java index 3b065be..d165361 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java @@ -57,8 +57,6 @@ public class ExtractController { private MetaDataService metaDataService; @Resource private DataExtractService dataExtractService; - @Resource - private TopicCache topicCache; /** * loading database metadata information @@ -227,6 +225,6 @@ public class ExtractController { */ @PostMapping("/notify/check/finished") public void notifyCheckTableFinished(@RequestParam(name = "tableName") String tableName) { - topicCache.removeTopic(tableName); + TopicCache.removeTopic(tableName); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index 3c6ea75..d07aafe 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -130,8 +130,6 @@ public class DataExtractServiceImpl implements DataExtractService { @Autowired private BaseDataService baseDataService; @Resource - private TopicCache topicCache; - @Resource private TableCheckPointCache tableCheckPointCache; @Resource private DynamicThreadPoolManager dynamicThreadPoolManager; @@ -169,7 +167,7 @@ public class DataExtractServiceImpl implements DataExtractService { log.info("The current endpoint is not the source endpoint, and the task cannot be built"); return new ArrayList<>(0); } - topicCache.initEndpoint(extractProperties.getEndpoint()); + TopicCache.initEndpoint(extractProperties.getEndpoint()); if (atomicProcessNo.compareAndSet(PROCESS_NO_RESET, processNo)) { Set tableNames = MetaDataCache.getAllKeys(); List taskList = extractTaskBuilder.builder(tableNames); @@ -333,14 +331,14 @@ public class DataExtractServiceImpl implements DataExtractService { } ThreadUtil.requestConflictingSleeping(); registerTopic(task); - while (!topicCache.canCreateTopic(maximumTopicSize)) { + while (!TopicCache.canCreateTopic(maximumTopicSize)) { ThreadUtil.sleep(1000); } Topic topic = task.getTopic(); Endpoint endpoint = extractProperties.getEndpoint(); log.info("try to create [{}] [{}]", topic.getTopicName(endpoint), topic.getPtnNum()); kafkaAdminService.createTopic(topic.getTopicName(endpoint), topic.getPtnNum()); - topicCache.add(topic); + TopicCache.add(topic); while (!tableCheckPointCache.getAll() .containsKey(tableName)) { ThreadUtil.sleepHalfSecond(); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java index 1702f74..7760eba 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java @@ -60,7 +60,6 @@ public class SliceDispatcher implements Runnable { private final BaseDataService baseDataService; private final SliceRegister sliceRegister; private final DynamicThreadPoolManager dynamicThreadPoolManager; - private TopicCache topicCache; private boolean isRunning = true; private final int maxFetchSize; @@ -79,7 +78,6 @@ public class SliceDispatcher implements Runnable { this.baseDataService = baseDataService; this.dynamicThreadPoolManager = dynamicThreadPoolManager; this.sliceFactory = new SliceFactory(baseDataService.getDataSource()); - this.topicCache = SpringUtil.getBean(TopicCache.class); this.maxFetchSize = ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TABLE_SLICE_SIZE); } @@ -89,6 +87,8 @@ public class SliceDispatcher implements Runnable { log.info("slice dispatcher is starting ..."); synchronized (lock) { final ThreadPoolExecutor executor = dynamicThreadPoolManager.getExecutor(EXTRACT_EXECUTOR); + int topicSize = ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TOPIC_SIZE); + int extendMaxPoolSize = ConfigCache.getIntValue(ConfigConstants.EXTEND_MAXIMUM_POOL_SIZE); Endpoint endPoint = ConfigCache.getEndPoint(); while (isRunning) { waitingForIdle(executor); @@ -111,24 +111,31 @@ public class SliceDispatcher implements Runnable { continue; } List tableSliceList = listener.fetchTableSliceList(table); - if (CollectionUtils.isNotEmpty(tableSliceList)) { + if (tableSliceList.size() <= 20) { + log.debug("table [{}] get main executor success", table); tableSliceList.forEach(sliceVo -> { sliceVo.setEndpoint(endPoint); register(sliceVo); doTableSlice(executor, sliceVo); }); + } else { + ThreadPoolExecutor extendExecutor = + (ThreadPoolExecutor) dynamicThreadPoolManager.getFreeExecutor(topicSize, extendMaxPoolSize); + log.debug("table [{}] get extend executor success", table); + tableSliceList.forEach(sliceVo -> { + sliceVo.setEndpoint(endPoint); + register(sliceVo); + doTableSlice(extendExecutor, sliceVo); + }); } - if (listener.isFinished()) { log.info("listener is finished , and will be closed"); listener.stop(); - while (executor.getTaskCount() > executor.getCompletedTaskCount()) { + while (!dynamicThreadPoolManager.allExecutorFree()) { ThreadUtil.sleepHalfSecond(); } - if (executor.getTaskCount() == executor.getCompletedTaskCount()) { - stop(); - dynamicThreadPoolManager.closeDynamicThreadPoolMonitor(); - } + stop(); + dynamicThreadPoolManager.closeDynamicThreadPoolMonitor(); } } } @@ -172,7 +179,7 @@ public class SliceDispatcher implements Runnable { TableMetadata tableMetadata = baseDataService.queryTableMetadata(slice.getTable()); slice.setFetchSize(Math.min((int) tableMetadata.getTableRows(), maxFetchSize)); slice.setWholeTable(slice.getTotal() <= 1); - Topic topic = topicCache.getTopic(slice.getTable()); + Topic topic = TopicCache.getTopic(slice.getTable()); slice.setPtnNum(topic.getPtnNum()); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java index 8f945e5..4fd28cf 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java @@ -65,8 +65,6 @@ public class SliceProcessorContext { private KafkaConsumerConfig kafkaConsumerConfig; @Resource private CheckingFeignClient checkingFeignClient; - @Resource - private TopicCache topicCache; private SliceStatusFeedbackService sliceStatusFeedbackService; /** @@ -84,13 +82,13 @@ public class SliceProcessorContext { } public SliceKafkaAgents createSliceKafkaAgents(String table, int ptn) { - Topic topic = topicCache.getTopic(table); + Topic topic = TopicCache.getTopic(table); String topicName = topic.getTopicName(ConfigCache.getEndPoint()); return new SliceKafkaAgents(kafkaTemplate, kafkaConsumerConfig.createConsumer(), topicName, ptn); } public SliceKafkaAgents createSliceKafkaAgents(String table) { - Topic topic = topicCache.getTopic(table); + Topic topic = TopicCache.getTopic(table); return new SliceKafkaAgents(kafkaTemplate, kafkaConsumerConfig.createConsumer(), topic); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceRegister.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceRegister.java index 7e79517..54a97da 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceRegister.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceRegister.java @@ -41,8 +41,6 @@ public class SliceRegister { @Resource private CheckingFeignClient checkingClient; @Resource - private TopicCache topicCache; - @Resource private KafkaAdminService kafkaAdminService; /** @@ -62,16 +60,16 @@ public class SliceRegister { * @return true | false */ public boolean registerTopic(String tableName, int ptnNum) { - Topic topic = topicCache.getTopic(tableName); + Topic topic = TopicCache.getTopic(tableName); if (Objects.nonNull(topic)) { return true; } - if (!topicCache.canCreateTopic(ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TOPIC_SIZE))) { + if (!TopicCache.canCreateTopic(ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TOPIC_SIZE))) { return false; } topic = checkingClient.registerTopic(tableName, ptnNum, ConfigCache.getEndPoint()); if (kafkaAdminService.createTopic(topic.getTopicName(ConfigCache.getEndPoint()), topic.getPtnNum())) { - topicCache.add(topic); + TopicCache.add(topic); return true; } else { return false; @@ -85,7 +83,7 @@ public class SliceRegister { * @return true | false */ public boolean checkTopicRegistered(String table) { - return Objects.nonNull(topicCache.getTopic(table)); + return Objects.nonNull(TopicCache.getTopic(table)); } /** -- Gitee