diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java index d0131494eb26f0cc71ffe425147a30826c671326..5852bf8f37cd0ed346c25319e36f35dc3272f47d 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java @@ -155,6 +155,7 @@ public class IncrementManagerService { if (INC_LOG_QUEUE.isEmpty()) { feignClientService.resumeIncrementMonitor(); LogUtils.warn(log, "resume increment monitor, because the inc-log-queue is empty !"); + ThreadUtil.sleepSecond(60); } else { feignClientService.pauseIncrementMonitor(); LogUtils.warn(log, "pause increment monitor, because the inc-log-queue is not empty !"); diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/WorkerSwitch.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/WorkerSwitch.java index bcf1d25f54fc3ff53dff1ff2cafe4877746142ac..6b6d69be24f864dc97f080e74d1801c82d9b022e 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/WorkerSwitch.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/WorkerSwitch.java @@ -23,7 +23,18 @@ package org.opengauss.datachecker.common.constant; * @since :11 */ public interface WorkerSwitch { + /** + * resume false + */ Boolean RESUME = false; + + /** + * pause true + */ Boolean PAUSE = true; - int SLEEP_TIME = 200; + + /** + * sleep time 2000 ms + */ + int SLEEP_TIME = 2000; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java index cda357456efd7bfdc6a4e5c9971c9ceecc349dc9..d9df6b04883954ab8ca2756ea02061c9358edd21 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java @@ -42,7 +42,9 @@ public class DebeziumWorker implements Runnable { private static final AtomicBoolean PAUSE_OR_RESUME = new AtomicBoolean(WorkerSwitch.RESUME); private static final AtomicBoolean RUNNING = new AtomicBoolean(true); private static final AtomicInteger POLL_BATCH_COUNT = new AtomicInteger(); + private static final AtomicInteger RETRY_POLL_EMPTY = new AtomicInteger(); private static final int MAX_BATCH_COUNT = 1000; + private static final int RETRY_TIMES = 3; private static final String NAME = "DebeziumWorker"; private DebeziumConsumerListener debeziumConsumerListener; @@ -68,9 +70,9 @@ public class DebeziumWorker implements Runnable { consumer = kafkaConsumerConfig.getDebeziumConsumer(); while (RUNNING.get()) { if (Objects.equals(PAUSE_OR_RESUME.get(), WorkerSwitch.RESUME)) { + log.debug("Debezium message listener is resume"); doConsumerRecord(consumer.poll(Duration.ofMillis(50))); } else { - log.debug("Debezium message listener is paused"); ThreadUtil.sleep(WorkerSwitch.SLEEP_TIME); } } @@ -92,9 +94,13 @@ public class DebeziumWorker implements Runnable { PAUSE_OR_RESUME.set(WorkerSwitch.PAUSE); POLL_BATCH_COUNT.set(0); } + RETRY_POLL_EMPTY.set(0); } else { - log.info("consumer record count=0"); - PAUSE_OR_RESUME.set(WorkerSwitch.PAUSE); + log.info("consumer record count=0 retry {}", RETRY_POLL_EMPTY.get()); + if (RETRY_POLL_EMPTY.incrementAndGet() > RETRY_TIMES) { + RETRY_POLL_EMPTY.set(0); + PAUSE_OR_RESUME.set(WorkerSwitch.PAUSE); + } } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java index abe96bdb272842ad59b9f29f1479390afefcd792..3e1cba9f1d44789287be996d44b8a50b7606dd05 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java @@ -199,7 +199,7 @@ public class DmlBuilder { } else { String value = columnsValue.get(columnName); if (Objects.isNull(value)) { - valueList.add("null"); + valueList.add(null); } else { valueList.add(SINGLE_QUOTES.concat(value) .concat(SINGLE_QUOTES)); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java index 720484fb4507868768b57441dc4ce8e7efc05940..f1372b5d712f3fdbcd1121bcfd2be3405cb0eddc 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java @@ -98,9 +98,9 @@ public class UpdateDmlBuilder extends DmlBuilder { public String build() { return Fragment.DML_UPDATE.replace(Fragment.SCHEMA, schema) - .replace(Fragment.TABLE_NAME, tableName) - .replace(Fragment.COLUMNS, buildColumnsValue()) - .replace(Fragment.CONDITION, buildConditionCompositePrimary()); + .replace(Fragment.TABLE_NAME, tableName) + .replace(Fragment.COLUMNS, buildColumnsValue()) + .replace(Fragment.CONDITION, buildConditionCompositePrimary()); } private String buildConditionCompositePrimary() { @@ -108,7 +108,7 @@ public class UpdateDmlBuilder extends DmlBuilder { final List primaryMetaDatas = metadata.getPrimaryMetas(); for (ColumnsMetaData primaryMeta : primaryMetaDatas) { builder.append(primaryMeta.getColumnName()) - .append(Fragment.EQUAL); + .append(Fragment.EQUAL); if (MetaDataUtil.isDigitKey(primaryMeta)) { builder.append(columnsValues.get(primaryMeta.getColumnName())); } else if (BLOB_LIST.contains(primaryMeta.getDataType()) || BINARY.contains(primaryMeta.getDataType())) { @@ -124,7 +124,10 @@ public class UpdateDmlBuilder extends DmlBuilder { } private String convertValue(String fieldValue) { - return Fragment.SINGLE_QUOTES + fieldValue + Fragment.SINGLE_QUOTES; + if (Objects.nonNull(fieldValue)) { + return Fragment.SINGLE_QUOTES + fieldValue + Fragment.SINGLE_QUOTES; + } + return fieldValue; } private String buildColumnsValue() { @@ -136,7 +139,7 @@ public class UpdateDmlBuilder extends DmlBuilder { } final String columnName = columnMeta.getColumnName(); builder.append(columnName) - .append(Fragment.EQUAL); + .append(Fragment.EQUAL); final String columnValue = columnsValues.get(columnName); if (MetaDataUtil.isDigitKey(columnMeta)) { builder.append(columnValue);