diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java index c13f103f53624f4cb4438d785547db0e7eb87a98..d840d7bb74ba27abb19e95231477e7761f17d4d0 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java @@ -47,7 +47,7 @@ public class AsyncConfig { executor.setCorePoolSize(1); executor.setMaxPoolSize(5); executor.setQueueCapacity(1000); - executor.setThreadNamePrefix("MyExecutor-"); + executor.setThreadNamePrefix("check-executor-"); executor.initialize(); return executor; } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckResultPathLoader.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckResultPathLoader.java index 401eec08ed03744cc936dff3d79372535a212d33..ed2fd40add0b4596edd538e89622e68b209955d9 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckResultPathLoader.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckResultPathLoader.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.check.load; import org.opengauss.datachecker.check.config.DataCheckProperties; import org.opengauss.datachecker.check.modules.check.ExportCheckResult; +import org.opengauss.datachecker.check.service.IncrementLogManager; import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; @@ -35,6 +36,8 @@ import javax.annotation.Resource; public class CheckResultPathLoader extends AbstractCheckLoader { @Resource private DataCheckProperties properties; + @Resource + private IncrementLogManager incrementLogManager; /** * Initialize the verification result environment @@ -44,6 +47,7 @@ public class CheckResultPathLoader extends AbstractCheckLoader { ExportCheckResult.initEnvironment(properties.getDataPath()); checkEnvironment.setExportCheckPath(properties.getDataPath()); ExportCheckResult.backCheckResultDirectory(); + incrementLogManager.init(ExportCheckResult.getResultBakRootDir()); LogUtils.info(log, "check service load export environment success."); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/ExportCheckResult.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/ExportCheckResult.java index 4f04c65ff524e8ee6fb94f429baae1fe065438a4..bb087d0c06375e9b9ee2493eae3c05d79c0916f0 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/ExportCheckResult.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/ExportCheckResult.java @@ -91,7 +91,7 @@ public class ExportCheckResult { resultPaths.forEach(file -> { try { Files.move(file, Path.of(concat(backDir, file.getFileName() - .toString())), ATOMIC_MOVE); + .toString())), ATOMIC_MOVE); } catch (IOException e) { LogUtils.error(log, "back the verification result environment error"); } @@ -103,13 +103,18 @@ public class ExportCheckResult { return ROOT_PATH.concat(CHECK_RESULT_PATH); } - private static String getResultBakRootDir() { + /** + * get result backup root dir + * + * @return path + */ + public static String getResultBakRootDir() { return ROOT_PATH.concat(CHECK_RESULT_BAK_DIR); } private static String concat(String dir, String fileName) { return dir.concat(File.separator) - .concat(fileName); + .concat(fileName); } private static String getResultBakDir() { diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementLogManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementLogManager.java new file mode 100644 index 0000000000000000000000000000000000000000..65f290559064428aebe36eae49e07234f798b0ba --- /dev/null +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementLogManager.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2024-2024 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.check.service; + +import org.apache.logging.log4j.Logger; +import org.opengauss.datachecker.common.util.LogUtils; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.io.File; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.WatchKey; +import java.nio.file.Files; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.WatchEvent; +import java.nio.file.FileVisitResult; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchService; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.stream.Collectors; + +/** + * IncrementLogManager + * + * @author wang chao + * @since 2022/5/8 19:17 + **/ +@Service +public class IncrementLogManager { + private static final Logger log = LogUtils.getLogger(IncrementLogManager.class); + private static final int MAX_BACK_DIR_NUM = 10; + + @Resource + private ThreadPoolTaskExecutor threadPoolTaskExecutor; + private WatchService watchService; + private LinkedList backDirs = new LinkedList<>(); + + /** + * init log dir and register watch service + * + * @param path path + */ + public void init(String path) { + try { + watchService = FileSystems.getDefault().newWatchService(); + Path dir = Paths.get(path); + File[] files = dir.toFile().listFiles(); + if (files != null && files.length > 0) { + backDirs.addAll(Arrays.stream(files).map(File::toPath).sorted().collect(Collectors.toList())); + } + dir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); + LogUtils.info(log, "registers path {} with a watch service. ", path); + bakResultLogMonitor(); + } catch (IOException e) { + LogUtils.error(log, "init watch service failed. ", e); + } + } + + /** + * monitor the result log file + */ + public void bakResultLogMonitor() { + threadPoolTaskExecutor.submit(() -> { + try { + WatchKey key = watchService.take(); + for (WatchEvent event : key.pollEvents()) { + WatchEvent.Kind kind = event.kind(); + + // 过滤出目录本身的事件 + WatchEvent ev = (WatchEvent) event; + Path fileName = ev.context(); + if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) { + LogUtils.warn(log, "monitor result back dir {} : {}", kind, fileName); + backDirs.addLast(fileName); + } + + while (backDirs.size() > MAX_BACK_DIR_NUM) { + try { + Path path = backDirs.removeFirst(); + deleteDir(path); + LogUtils.warn(log, "remove result back more dir : {}", path); + } catch (IOException e) { + LogUtils.error(log, "remove result back more dir : {}", e.getMessage()); + } + } + } + key.reset(); + } catch (InterruptedException e) { + LogUtils.error(log, "monitor result back dir : {}", e.getMessage()); + } + }); + } + + private static void deleteDir(Path dir) throws IOException { + Files.walkFileTree(dir, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + if (exc == null) { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } else { + // 目录删除失败,可以选择抛出异常或记录日志 + throw exc; + } + } + }); + } +} diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java index b6a6a09f4265bd5019a5c52aecbca3e0131a7efe..4a4407df424809c5e172d40f0d4caf7351f20d60 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java @@ -87,6 +87,9 @@ public class IncrementManagerService { private CheckResultManagerService checkResultManagerService; @Resource private DynamicThreadPoolManager dynamicThreadPoolManager; + @Resource + private IncrementLogManager incrementLogManager; + private final AtomicInteger retryTimes = new AtomicInteger(0); private static final int RETRY_SLEEP_TIMES = 1000; private static final int MAX_RETRY_SLEEP_TIMES = 1000; @@ -157,13 +160,14 @@ public class IncrementManagerService { mergeDataLogList(dataLogList, lastResults); if (CollectionUtils.isNotEmpty(dataLogList)) { ExportCheckResult.backCheckResultDirectory(); + incrementLogManager.bakResultLogMonitor(); sliceProgressService.startProgressing(); PROCESS_SIGNATURE.set(IdGenerator.nextId36()); checkResultManagerService.progressing(dataLogList.size()); incrementDataLogsChecking(dataLogList); } else { LogUtils.info(log, "There are no differences to verify at the current time. Please wait."); - ThreadUtil.sleep(30000); + ThreadUtil.sleepSecond(5); } } catch (Exception ex) { LogUtils.error(log, "take inc log queue interrupted "); diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java index 8c70d83dfde27e947f743d51ccbd42dacefc17c4..ca7cdc2fe68daec5ca92c4ea5749d0977ddebe38 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; @@ -50,6 +51,19 @@ public class ThreadUtil { } } + /** + * Thread sleep second + * + * @param second Sleep time + */ + public static void sleepSecond(int second) { + try { + TimeUnit.SECONDS.sleep(second); + } catch (InterruptedException ie) { + LogUtils.warn(log, "thread sleep interrupted exception "); + } + } + /** * The current thread sleeps for 10 - 500 milliseconds */ @@ -141,13 +155,13 @@ public class ThreadUtil { */ public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().daemon(true) - .build()); + .build()); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) { return new ScheduledThreadPoolExecutor(1, new BasicThreadFactory. - Builder().namingPattern(name) - .build()); + Builder().namingPattern(name) + .build()); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java index 62ec035cee9595f5c639a00a7895805d80962467..a4503814cf4b4b0a010db5c22d8255515cf869e1 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java @@ -79,7 +79,7 @@ public class DebeziumWorker implements Runnable { } } else { log.info("consumer record count=0"); - ThreadUtil.sleepMax2Second(); + ThreadUtil.sleepSecond(5); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java index 6f8cac4658a50f0ab868c845fceca3160dd6701b..7fc48faa1556f8371b219db437daba143f2df3b6 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java @@ -162,7 +162,7 @@ public class KafkaAdminService { */ public boolean isTopicExists(String topicName) { try { - LogUtils.debug(log, "check topic [{} : group{}] has exists --> check kafka list topics", topicName); + LogUtils.debug(log, "check topic [{}] has exists --> check kafka list topics", topicName); return adminClient.listTopics() .listings() .get()