diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java index 6166814ac3055c761f968c1edbc058f71023f19d..2d49aaa39381152d304549f9938a2827f589acc9 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java @@ -287,4 +287,9 @@ public interface ConfigConstants { * spring.extract.debezium-row-display */ String DEBEZIUM_ROW_DISPLAY = "spring.extract.debezium-row-display"; + + /** + * spring.check.max-retry-times + */ + String MAX_RETRY_TIMES = "spring.check.max-retry-times"; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java index 7f80b2d06f6a34938d0dc4badb7303cd2b90167a..e839b5d0e182a0f76e778d372b5ba8fb7a49f41d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java @@ -32,6 +32,7 @@ import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.exception.CsvDataAccessException; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.extract.util.CsvUtil; import org.springframework.jdbc.core.RowMapper; import javax.sql.DataSource; @@ -78,6 +79,10 @@ public class CsvDataAccessService implements DataAccessService { log.error("csv metadata info does not exist {}", pathOfTables); throw new CsvDataAccessException("csv metadata load failed"); } + Path reader = Path.of(ConfigCache.getReader()); + if (!CsvUtil.checkExistAndWait(reader)) { + throw new CsvDataAccessException("file " + reader.toString() + " is not exist"); + } Stream lineOfTables = Files.lines(pathOfTables); return lineOfTables.parallel() .map(tableJson -> JSONObject.parseObject(tableJson, CsvTableMeta.class)) @@ -95,9 +100,13 @@ public class CsvDataAccessService implements DataAccessService { tableMetadataMap.clear(); Path pathOfTables = ConfigCache.getCsvMetadataTablesPath(); Path pathOfColumns = ConfigCache.getCsvMetadataColumnsPath(); - if (Files.notExists(pathOfTables) || Files.notExists(pathOfColumns)) { + Path reader = Path.of(ConfigCache.getReader()); + if (!CsvUtil.checkExistAndWait(reader)) { + throw new CsvDataAccessException("file " + reader.toString() + " is not exist"); + } + if (!CsvUtil.checkExistAndWait(pathOfTables) || !CsvUtil.checkExistAndWait(pathOfColumns)) { log.error("csv metadata info does not exist {} or {}", pathOfTables, pathOfColumns); - throw new CsvDataAccessException("csv metadata load failed"); + throw new CsvDataAccessException("file " + reader.toString() + " is not exist"); } try { Stream lineOfTables = Files.lines(pathOfTables); @@ -105,7 +114,6 @@ public class CsvDataAccessService implements DataAccessService { CsvTableMeta csvTableMeta = JSONObject.parseObject(tableJson, CsvTableMeta.class); tableMetadataMap.put(csvTableMeta.getTable(), csvTableMeta.toTableMetadata()); }); - Stream lineOfColumns = Files.lines(pathOfColumns); List columns = new LinkedList<>(); lineOfColumns.forEach(columnJson -> { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java index abb96c5153500f7b81ae3be5ac8d7a056f4dfa8f..c34e2aa3c4d1e357eca506de35048152ae280106 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java @@ -23,12 +23,15 @@ import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.SliceVo; +import org.opengauss.datachecker.common.exception.CsvDataAccessException; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.MapUtils; import org.opengauss.datachecker.extract.client.CheckingFeignClient; import org.opengauss.datachecker.extract.constants.ExtConstants; +import org.opengauss.datachecker.extract.util.CsvUtil; import java.io.File; +import java.nio.file.Path; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -55,6 +58,10 @@ public class CsvReaderListener implements CsvListener { public void initCsvListener(CheckingFeignClient checkingClient) { log.info("csv reader listener is starting ."); this.checkingClient = checkingClient; + Path reader = Path.of(ConfigCache.getReader()); + if (!CsvUtil.checkExistAndWait(reader)) { + throw new CsvDataAccessException("file " + reader.toString() + " is not exist"); + } // creates and starts a Tailer for read writer logs in real time tailer = Tailer.create(new File(ConfigCache.getReader()), new TailerListenerAdapter() { @Override diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java index b8d044c04bb8dd06bc6489f3a6ab72d021ac77e0..898f9f4f87bd40fdd480fc2d07a532e2e52e2435 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java @@ -29,13 +29,16 @@ import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.enums.SliceIndexStatus; import org.opengauss.datachecker.common.entry.enums.SliceLogType; import org.opengauss.datachecker.common.entry.extract.SliceVo; +import org.opengauss.datachecker.common.exception.CsvDataAccessException; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.MapUtils; import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.extract.client.CheckingFeignClient; import org.opengauss.datachecker.extract.constants.ExtConstants; +import org.opengauss.datachecker.extract.util.CsvUtil; import java.io.File; +import java.nio.file.Path; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -73,6 +76,10 @@ public class CsvWriterListener implements CsvListener { public void initCsvListener(CheckingFeignClient checkingClient) { this.checkingClient = checkingClient; log.info("csv writer listener is starting ."); + Path writer = Path.of(ConfigCache.getWriter()); + if (!CsvUtil.checkExistAndWait(writer)) { + throw new CsvDataAccessException("file " + writer.toString() + " is not exist"); + } // creates and starts a tailer for read writer logs in real time tailer = Tailer.create(new File(ConfigCache.getWriter()), new TailerListenerAdapter() { @Override @@ -129,7 +136,8 @@ public class CsvWriterListener implements CsvListener { int interval = ConfigCache.getIntValue(ConfigConstants.CSV_TASK_DISPATCHER_INTERVAL) * 1000; int maxDispatcherSize = ConfigCache.getIntValue(ConfigConstants.CSV_MAX_DISPATCHER_SIZE); feedbackExecutor.submit(() -> { - Thread.currentThread().setName(TABLE_INDEX_COMPLETED_FEEDBACK); + Thread.currentThread() + .setName(TABLE_INDEX_COMPLETED_FEEDBACK); List completedTableList = new LinkedList<>(); while (isFeedbackRunning) { try { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java index d737f03d0281ad3a8d0a9531d060fb11228c7461..597b7f034e57649aba0e60fa3fa3cc15aa90f562 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java @@ -84,6 +84,8 @@ public class ConfigManagement { private int timeoutPerShutdownPhase; @Value("${spring.extract.object-size-expansion-factor}") private int objectSizeExpansionFactor; + @Value("${spring.check.max-retry-times}") + private int maxRetryTimes; /** * init csv config @@ -128,6 +130,7 @@ public class ConfigManagement { ConfigCache.put(ConfigConstants.FETCH_SIZE, 1000); ConfigCache.put(ConfigConstants.TIMEOUT_PER_SHUTDOWN_PHASE, timeoutPerShutdownPhase); ConfigCache.put(ConfigConstants.EXTEND_MAXIMUM_POOL_SIZE, extendMaxPoolSize); + ConfigCache.put(ConfigConstants.MAX_RETRY_TIMES, maxRetryTimes); loadKafkaProperties(); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/CsvUtil.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/CsvUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..7a2eccf6b7dffd5bf0e350e6cd2a2db706bafeac --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/CsvUtil.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.util; + +import org.apache.logging.log4j.Logger; +import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.constant.ConfigConstants; +import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; + +import java.nio.file.Files; +import java.nio.file.Path; + +/** + * CsvUtil + * + * @author :wangchao + * @date :Created in 2024/2/29 + * @since :11 + */ +public class CsvUtil { + private static final Logger log = LogUtils.getLogger(); + + /** + * check file exist ,if not exist ,wait and sleep half second to continue check. + * + * @param file file + * @return boolean + */ + public static boolean checkExistAndWait(Path file) { + int maxRetryTimes = ConfigCache.getIntValue(ConfigConstants.MAX_RETRY_TIMES); + while (Files.notExists(file) && maxRetryTimes > 0) { + log.warn("file {} is not exist, waiting ...", file); + ThreadUtil.sleepHalfSecond(); + maxRetryTimes--; + } + return Files.exists(file); + } +}