From 35e4437d13baecbb2e3f6118dc1535a87f064440 Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Sat, 31 Aug 2024 15:02:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E9=87=8F=E6=A0=A1=E9=AA=8C=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E8=AF=AD=E5=8F=A5=E4=B8=ADNULL=20=E5=B8=A6=E6=9C=89?= =?UTF-8?q?=E5=A4=9A=E4=BD=99=E5=BC=95=E5=8F=B7=E9=97=AE=E9=A2=98=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../check/service/IncrementManagerService.java | 1 + .../datachecker/common/constant/WorkerSwitch.java | 13 ++++++++++++- .../extract/debezium/DebeziumWorker.java | 12 +++++++++--- .../datachecker/extract/dml/DmlBuilder.java | 2 +- .../datachecker/extract/dml/UpdateDmlBuilder.java | 15 +++++++++------ 5 files changed, 32 insertions(+), 11 deletions(-) diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java index d013149..5852bf8 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java @@ -155,6 +155,7 @@ public class IncrementManagerService { if (INC_LOG_QUEUE.isEmpty()) { feignClientService.resumeIncrementMonitor(); LogUtils.warn(log, "resume increment monitor, because the inc-log-queue is empty !"); + ThreadUtil.sleepSecond(60); } else { feignClientService.pauseIncrementMonitor(); LogUtils.warn(log, "pause increment monitor, because the inc-log-queue is not empty !"); diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/WorkerSwitch.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/WorkerSwitch.java index bcf1d25..6b6d69b 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/WorkerSwitch.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/WorkerSwitch.java @@ -23,7 +23,18 @@ package org.opengauss.datachecker.common.constant; * @since :11 */ public interface WorkerSwitch { + /** + * resume false + */ Boolean RESUME = false; + + /** + * pause true + */ Boolean PAUSE = true; - int SLEEP_TIME = 200; + + /** + * sleep time 2000 ms + */ + int SLEEP_TIME = 2000; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java index cda3574..d9df6b0 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java @@ -42,7 +42,9 @@ public class DebeziumWorker implements Runnable { private static final AtomicBoolean PAUSE_OR_RESUME = new AtomicBoolean(WorkerSwitch.RESUME); private static final AtomicBoolean RUNNING = new AtomicBoolean(true); private static final AtomicInteger POLL_BATCH_COUNT = new AtomicInteger(); + private static final AtomicInteger RETRY_POLL_EMPTY = new AtomicInteger(); private static final int MAX_BATCH_COUNT = 1000; + private static final int RETRY_TIMES = 3; private static final String NAME = "DebeziumWorker"; private DebeziumConsumerListener debeziumConsumerListener; @@ -68,9 +70,9 @@ public class DebeziumWorker implements Runnable { consumer = kafkaConsumerConfig.getDebeziumConsumer(); while (RUNNING.get()) { if (Objects.equals(PAUSE_OR_RESUME.get(), WorkerSwitch.RESUME)) { + log.debug("Debezium message listener is resume"); doConsumerRecord(consumer.poll(Duration.ofMillis(50))); } else { - log.debug("Debezium message listener is paused"); ThreadUtil.sleep(WorkerSwitch.SLEEP_TIME); } } @@ -92,9 +94,13 @@ public class DebeziumWorker implements Runnable { PAUSE_OR_RESUME.set(WorkerSwitch.PAUSE); POLL_BATCH_COUNT.set(0); } + RETRY_POLL_EMPTY.set(0); } else { - log.info("consumer record count=0"); - PAUSE_OR_RESUME.set(WorkerSwitch.PAUSE); + log.info("consumer record count=0 retry {}", RETRY_POLL_EMPTY.get()); + if (RETRY_POLL_EMPTY.incrementAndGet() > RETRY_TIMES) { + RETRY_POLL_EMPTY.set(0); + PAUSE_OR_RESUME.set(WorkerSwitch.PAUSE); + } } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java index abe96bd..3e1cba9 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java @@ -199,7 +199,7 @@ public class DmlBuilder { } else { String value = columnsValue.get(columnName); if (Objects.isNull(value)) { - valueList.add("null"); + valueList.add(null); } else { valueList.add(SINGLE_QUOTES.concat(value) .concat(SINGLE_QUOTES)); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java index 720484f..f1372b5 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java @@ -98,9 +98,9 @@ public class UpdateDmlBuilder extends DmlBuilder { public String build() { return Fragment.DML_UPDATE.replace(Fragment.SCHEMA, schema) - .replace(Fragment.TABLE_NAME, tableName) - .replace(Fragment.COLUMNS, buildColumnsValue()) - .replace(Fragment.CONDITION, buildConditionCompositePrimary()); + .replace(Fragment.TABLE_NAME, tableName) + .replace(Fragment.COLUMNS, buildColumnsValue()) + .replace(Fragment.CONDITION, buildConditionCompositePrimary()); } private String buildConditionCompositePrimary() { @@ -108,7 +108,7 @@ public class UpdateDmlBuilder extends DmlBuilder { final List primaryMetaDatas = metadata.getPrimaryMetas(); for (ColumnsMetaData primaryMeta : primaryMetaDatas) { builder.append(primaryMeta.getColumnName()) - .append(Fragment.EQUAL); + .append(Fragment.EQUAL); if (MetaDataUtil.isDigitKey(primaryMeta)) { builder.append(columnsValues.get(primaryMeta.getColumnName())); } else if (BLOB_LIST.contains(primaryMeta.getDataType()) || BINARY.contains(primaryMeta.getDataType())) { @@ -124,7 +124,10 @@ public class UpdateDmlBuilder extends DmlBuilder { } private String convertValue(String fieldValue) { - return Fragment.SINGLE_QUOTES + fieldValue + Fragment.SINGLE_QUOTES; + if (Objects.nonNull(fieldValue)) { + return Fragment.SINGLE_QUOTES + fieldValue + Fragment.SINGLE_QUOTES; + } + return fieldValue; } private String buildColumnsValue() { @@ -136,7 +139,7 @@ public class UpdateDmlBuilder extends DmlBuilder { } final String columnName = columnMeta.getColumnName(); builder.append(columnName) - .append(Fragment.EQUAL); + .append(Fragment.EQUAL); final String columnValue = columnsValues.get(columnName); if (MetaDataUtil.isDigitKey(columnMeta)) { builder.append(columnValue); -- Gitee