From 4aedf976a24a5aa38288fb7ce3f95cf3d37774f1 Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Mon, 20 Jan 2025 21:15:34 +0800 Subject: [PATCH 1/4] =?UTF-8?q?DataKit=E6=96=AD=E7=82=B9=E7=BB=AD=E4=BC=A0?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/vo/HostRealtimeStatistics.java | 11 +- .../framework/config/ThreadPoolConfig.java | 5 +- .../service/HostMonitorCacheService.java | 40 +- .../system/service/JschExecutorService.java | 9 +- .../DataMigrationApplicationRunner.java | 12 +- .../admin/plugin/constants/TaskConstant.java | 8 +- .../plugin/context/MigrationTaskContext.java | 50 ++ .../controller/MigrationTaskController.java | 77 ++- .../domain/MigrationHostPortalInstall.java | 9 + .../MigrationTaskCheckProgressDetail.java | 60 +++ .../MigrationTaskCheckProgressSummary.java | 70 +++ .../admin/plugin/enums/TaskStatus.java | 13 +- .../handler/MigrationRecoveryHandler.java | 452 ++++++++++++++++++ ...igrationTaskCheckProgressDetailMapper.java | 38 ++ ...grationTaskCheckProgressSummaryMapper.java | 38 ++ ...grationTaskCheckProgressDetailService.java | 45 ++ ...rationTaskCheckProgressSummaryService.java | 53 ++ .../plugin/service/MigrationTaskService.java | 58 ++- .../impl/MigrationMainTaskServiceImpl.java | 26 +- ...ionTaskCheckProgressDetailServiceImpl.java | 57 +++ .../MigrationTaskCheckProgressMonitor.java | 234 +++++++++ .../MigrationTaskCheckProgressService.java | 186 +++++++ ...onTaskCheckProgressSummaryServiceImpl.java | 65 +++ .../impl/MigrationTaskServiceImpl.java | 381 +++++++++++---- .../MigrationTaskStatusRecordServiceImpl.java | 82 ++-- .../admin/plugin/vo/FullCheckParam.java | 42 ++ .../admin/plugin/vo/ProcessStatus.java | 51 ++ .../admin/plugin/vo/TaskProcessStatus.java | 44 ++ .../src/main/resources/opengauss-schema.sql | 78 ++- 29 files changed, 2129 insertions(+), 165 deletions(-) create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/context/MigrationTaskContext.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressDetail.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressSummary.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/handler/MigrationRecoveryHandler.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressDetailMapper.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressSummaryMapper.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressDetailService.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressSummaryService.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressDetailServiceImpl.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressMonitor.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressService.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressSummaryServiceImpl.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/FullCheckParam.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/ProcessStatus.java create mode 100644 plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/TaskProcessStatus.java diff --git a/openGauss-datakit/visualtool-common/src/main/java/org/opengauss/admin/common/core/vo/HostRealtimeStatistics.java b/openGauss-datakit/visualtool-common/src/main/java/org/opengauss/admin/common/core/vo/HostRealtimeStatistics.java index 11cc58c9f..d3fc88d39 100644 --- a/openGauss-datakit/visualtool-common/src/main/java/org/opengauss/admin/common/core/vo/HostRealtimeStatistics.java +++ b/openGauss-datakit/visualtool-common/src/main/java/org/opengauss/admin/common/core/vo/HostRealtimeStatistics.java @@ -35,6 +35,11 @@ import java.util.Locale; */ @Data public class HostRealtimeStatistics { + /** + * disk unit size 1024*1024,to format disk GB unit + */ + private static final long DISK_UNIT = 1024 * 1024; + private float total; private float used; private float available; @@ -50,9 +55,9 @@ public class HostRealtimeStatistics { HostRealtimeStatistics bean = new HostRealtimeStatistics(); String[] dev = diskMonitor.split(" "); if (dev.length == 7) { - bean.total += Math.round((float) Long.parseLong(dev[2]) / 1048576); - bean.used += Math.round((float) Long.parseLong(dev[3]) / 1048576); - bean.available += Math.round((float) Long.parseLong(dev[4]) / 1048576); + bean.total += Math.round((float) Long.parseLong(dev[2]) / DISK_UNIT); + bean.used += Math.round((float) Long.parseLong(dev[3]) / DISK_UNIT); + bean.available += Math.round((float) Long.parseLong(dev[4]) / DISK_UNIT); bean.use = dev[5].replace("%", ""); } return bean; diff --git a/openGauss-datakit/visualtool-framework/src/main/java/org/opengauss/admin/framework/config/ThreadPoolConfig.java b/openGauss-datakit/visualtool-framework/src/main/java/org/opengauss/admin/framework/config/ThreadPoolConfig.java index d105f7601..df336bbb8 100644 --- a/openGauss-datakit/visualtool-framework/src/main/java/org/opengauss/admin/framework/config/ThreadPoolConfig.java +++ b/openGauss-datakit/visualtool-framework/src/main/java/org/opengauss/admin/framework/config/ThreadPoolConfig.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.opengauss.admin.common.utils.Threads; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ScheduledExecutorService; @@ -40,6 +41,7 @@ import java.util.concurrent.ThreadPoolExecutor; * @author xielibo **/ @Configuration +@EnableAsync public class ThreadPoolConfig { private int scheduledCorePoolSize = 10; private int corePoolSize = 50; @@ -57,10 +59,11 @@ public class ThreadPoolConfig { executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } + @Bean(name = "scheduledExecutorService") protected ScheduledExecutorService scheduledExecutorService() { return new ScheduledThreadPoolExecutor(scheduledCorePoolSize, - new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build()) { + new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build()) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); diff --git a/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java b/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java index 886b09abc..cb0974449 100644 --- a/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java +++ b/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java @@ -37,6 +37,7 @@ import org.opengauss.admin.system.plugin.beans.SshLogin; import org.opengauss.admin.system.service.ops.IHostService; import org.opengauss.admin.system.service.ops.IHostUserService; import org.opengauss.admin.system.service.ops.impl.EncryptionUtils; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.Arrays; @@ -78,12 +79,14 @@ public class HostMonitorCacheService { /** * init host cache scheduled */ + @Async public void initHostMonitorCacheService() { lastedFetch.set(System.currentTimeMillis() / 1000); List list = hostService.list(); for (OpsHostEntity host : list) { cacheHostMap.put(host.getHostId(), host); } + initCache(); // host static info, support refresh data per minute scheduledExecutorService.scheduleWithFixedDelay(() -> { Thread.currentThread().setName("host-monitor"); @@ -100,7 +103,7 @@ public class HostMonitorCacheService { log.warn("monitor host cache {} error : ", host.getPublicIp(), ex); } }); - }, 0, 5 * 60, TimeUnit.SECONDS); + }, 0, 10 * 60, TimeUnit.SECONDS); // host dynamic info,support seconds level data refresh scheduledExecutorService.scheduleWithFixedDelay(() -> { Thread.currentThread().setName("host-realtime-monitor"); @@ -117,7 +120,21 @@ public class HostMonitorCacheService { log.warn("monitor host cache {} error : {}", host.getPublicIp(), ex.getMessage()); } }); - }, 2, 6, TimeUnit.SECONDS); + }, 1, 30, TimeUnit.SECONDS); + } + + private void initCache() { + cacheHostMap.values().forEach(host -> { + try { + OpsHostUserEntity user = getHostUserRootOrNormal(host.getHostId()); + SshLogin sshLogin = new SshLogin(host.getPublicIp(), host.getPort(), user.getUsername(), + encryptionUtils.decrypt(user.getPassword())); + cacheHostFixedInfo(host.getHostId(), sshLogin); + cacheHostRealtimeInfo(host.getHostId(), sshLogin); + } catch (OpsException ex) { + log.warn("monitor host cache {} error : ", host.getPublicIp(), ex); + } + }); } private synchronized String getCacheValue(String hostId, String key) { @@ -267,21 +284,22 @@ public class HostMonitorCacheService { if (line.contains("Architecture")) { String[] arch = line.split(":"); hostInfoMap.put(CacheConstants.CPU_ARCH, arch[1].trim()); - } else if (line.contains("CPU(s):") && !line.contains("node")) { + continue; + } + if (line.contains("CPU(s):") && !line.contains("node")) { String[] arch = line.split(":"); hostInfoMap.put(CacheConstants.CPU_CORE_NUM, arch[1].trim()); - } else if (line.contains("CPUmaxMHz:")) { - // arm frequency - String[] arch = line.split(":"); - hostInfoMap.put(CacheConstants.CPU_FREQUENCY, toCpuGHz(arch[1].trim())); - } else if (line.contains("CPUMHz:")) { - // x86 frequency + continue; + } + // CPUmaxMHz arm frequency ; CPUMHz x86 frequency + if (line.contains("CPUmaxMHz:") || line.contains("CPUMHz:")) { String[] arch = line.split(":"); hostInfoMap.put(CacheConstants.CPU_FREQUENCY, toCpuGHz(arch[1].trim())); - } else { - log.debug("Unknown cpu info: {}", line); } } + if ((!ofCpu.contains("CPU max MHz:")) && (!ofCpu.contains("CPU MHz:"))) { + hostInfoMap.put(CacheConstants.CPU_FREQUENCY, "Unknown Frequency Of lscpu cmd"); + } } catch (OpsException ex) { log.error("error occurred while getting cpu monitor {}", ex.getMessage()); } diff --git a/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/JschExecutorService.java b/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/JschExecutorService.java index 3c228a5e0..418ab1ead 100644 --- a/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/JschExecutorService.java +++ b/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/JschExecutorService.java @@ -35,6 +35,7 @@ import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; +import org.opengauss.admin.common.constant.CommonConstants; import org.opengauss.admin.common.constant.ops.SshCommandConstants; import org.opengauss.admin.common.core.domain.model.ops.WsSession; import org.opengauss.admin.common.core.handler.ops.cache.WsConnectorManager; @@ -655,13 +656,13 @@ public class JschExecutorService { channel.connect(); String line; while ((line = reader.readLine()) != null) { - result.append(line.trim()).append("\n"); + result.append(line.trim()).append(CommonConstants.LINE_SPLITTER); } deleteCharAtEndOfStringBuffer(result); StringBuilder errorResult = new StringBuilder(); String errorLine; while ((errorLine = errorReader.readLine()) != null) { - errorResult.append(errorLine.trim()).append("\n"); + errorResult.append(errorLine.trim()).append(CommonConstants.LINE_SPLITTER); } deleteCharAtEndOfStringBuffer(errorResult); int exitStatus = channel.getExitStatus(); @@ -709,10 +710,10 @@ public class JschExecutorService { channel.connect(); String errorLine; while ((errorLine = reader.readLine()) != null) { - result.append(errorLine.trim()).append("\n"); + result.append(errorLine.trim()).append(CommonConstants.LINE_SPLITTER); } while ((errorLine = errorReader.readLine()) != null) { - result.append(errorLine.trim()).append("\n"); + result.append(errorLine.trim()).append(CommonConstants.LINE_SPLITTER); } deleteCharAtEndOfStringBuffer(result); int exitStatus = channel.getExitStatus(); diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/config/DataMigrationApplicationRunner.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/config/DataMigrationApplicationRunner.java index 9e02d2ff4..166414ce9 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/config/DataMigrationApplicationRunner.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/config/DataMigrationApplicationRunner.java @@ -56,15 +56,21 @@ public class DataMigrationApplicationRunner implements ApplicationRunner { try { migrationTaskService.doOfflineTaskRunScheduler(); } catch (Exception e) { - log.error("OffLineTaskRunScheduler error", e.getMessage()); + log.error("OffLineTaskRunScheduler error {}", e.getMessage()); } }); - threadPoolTaskExecutor.submit(() -> { try { migrationMainTaskService.doRefreshMainTaskStatus(); } catch (Exception e) { - log.error("RefreshMainTaskStatus error", e.getMessage()); + log.error("RefreshMainTaskStatus error {}", e.getMessage()); + } + }); + threadPoolTaskExecutor.submit(() -> { + try { + migrationTaskService.initMigrationTaskCheckProgressMonitor(); + } catch (Exception e) { + log.error("init migration task check progress monitor error {}", e.getMessage()); } }); } diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/constants/TaskConstant.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/constants/TaskConstant.java index b66e514aa..f3c9d2e48 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/constants/TaskConstant.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/constants/TaskConstant.java @@ -54,9 +54,13 @@ public class TaskConstant { TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.FULL_CHECK_FINISH.getCode(), TaskOperate.RUN.getCode()); TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.INCREMENTAL_START.getCode(), TaskOperate.RUN.getCode()); TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.INCREMENTAL_RUNNING.getCode(), TaskOperate.RUN.getCode()); - TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.INCREMENTAL_FINISHED.getCode(), TaskOperate.STOP_INCREMENTAL.getCode()); - TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.INCREMENTAL_STOPPED.getCode(), TaskOperate.STOP_INCREMENTAL.getCode()); + TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.INCREMENTAL_PAUSE.getCode(), TaskOperate.RUN.getCode()); + TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.INCREMENTAL_FINISHED.getCode(), + TaskOperate.STOP_INCREMENTAL.getCode()); + TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.INCREMENTAL_STOPPED.getCode(), + TaskOperate.STOP_INCREMENTAL.getCode()); TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.REVERSE_START.getCode(), TaskOperate.START_REVERSE.getCode()); TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.REVERSE_RUNNING.getCode(), TaskOperate.START_REVERSE.getCode()); + TASK_STATUS_OPERATE_MAPPING.put(TaskStatus.REVERSE_PAUSE.getCode(), TaskOperate.START_REVERSE.getCode()); } } diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/context/MigrationTaskContext.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/context/MigrationTaskContext.java new file mode 100644 index 000000000..300dfc5ec --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/context/MigrationTaskContext.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskContext.java + * + * IDENTIFICATION + *plugins/data-migration/src/main/java/org/opengauss/admin/plugin/context/MigrationTaskContext.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.context; + +import lombok.Data; + +import org.opengauss.admin.plugin.domain.MigrationHostPortalInstall; +import org.opengauss.admin.plugin.domain.MigrationTask; +import org.opengauss.admin.plugin.service.MigrationTaskCheckProgressDetailService; +import org.opengauss.admin.plugin.service.MigrationTaskCheckProgressSummaryService; +import org.opengauss.admin.system.service.ops.impl.EncryptionUtils; + +/** + * MigrationTaskContext + * + * @author: wangchao + * @Date: 2024/12/31 10:58 + * @Description: MigrationTaskContext + * @since 7.0.0 + **/ +@Data +public class MigrationTaskContext { + private MigrationHostPortalInstall installHost; + private MigrationTask migrationTask; + private Integer id; + private EncryptionUtils encryptionUtils; + private MigrationTaskCheckProgressSummaryService migrationTaskCheckProgressSummaryService; + private MigrationTaskCheckProgressDetailService migrationTaskCheckProgressDetailService; +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/controller/MigrationTaskController.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/controller/MigrationTaskController.java index b7f54cbf6..a368d62c8 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/controller/MigrationTaskController.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/controller/MigrationTaskController.java @@ -39,10 +39,10 @@ import org.opengauss.admin.plugin.domain.MigrationTask; import org.opengauss.admin.plugin.dto.MigrationMainTaskDto; import org.opengauss.admin.plugin.dto.MigrationTaskDto; import org.opengauss.admin.plugin.handler.PortalHandle; -import org.opengauss.admin.plugin.service.MainTaskEnvErrorHostService; import org.opengauss.admin.plugin.service.MigrationMainTaskService; import org.opengauss.admin.plugin.service.MigrationTaskService; import org.opengauss.admin.plugin.utils.FileUtils; +import org.opengauss.admin.plugin.vo.FullCheckParam; import org.opengauss.admin.system.service.ops.impl.EncryptionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; @@ -50,6 +50,7 @@ import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletResponse; import java.io.BufferedOutputStream; +import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Date; @@ -69,9 +70,6 @@ public class MigrationTaskController extends BaseController { @Autowired private MigrationMainTaskService migrationMainTaskService; - @Autowired - private MainTaskEnvErrorHostService mainTaskEnvErrorHostService; - @Autowired @AutowiredType(AutowiredType.Type.PLUGIN_MAIN) private EncryptionUtils encryptionUtils; @@ -207,7 +205,7 @@ public class MigrationTaskController extends BaseController { /** * start subtask reverse */ - @Log(title = "task", businessType = BusinessType.STOP) + @Log(title = "task", businessType = BusinessType.START) @PostMapping("/subTask/start/reverse/{id}") public AjaxResult startSubTaskReverse(@PathVariable Integer id ) { return migrationMainTaskService.startSubTaskReverse(id); @@ -243,4 +241,73 @@ public class MigrationTaskController extends BaseController { output.close(); } + /** + * Check status of incremental or reverse migration task + * + * @param id task id + * @return status + */ + @GetMapping(value = "/check/incremental/reverse/status/{id}") + public AjaxResult checkStatusOfIncrementalOrReverseMigrationTask(@PathVariable("id") Integer id) { + return AjaxResult.success(migrationTaskService.checkStatusOfIncrementalOrReverseMigrationTask(id)); + } + + /** + * Start incremental or reverse migration task + * + * @param id task id + * @param name endpoint name + * @return result + */ + @PostMapping(value = "/start/incremental/reverse/task/process/{id}") + public AjaxResult startTaskOfOnlineOrReverseMigrationProcess(@PathVariable("id") Integer id, + @RequestParam("name") String name) { + return AjaxResult.success(migrationTaskService.startTaskOfIncrementalOrReverseMigrationProcess(id, name)); + } + + /** + * Query full check summary of migration task + * + * @param id task + * @return summary + */ + @GetMapping(value = "/query/full/check/summary/{id}") + public AjaxResult queryFullCheckSummary(@PathVariable("id") Integer id) { + return AjaxResult.success(migrationTaskService.queryFullCheckSummaryOfMigrationTask(id)); + } + + /** + * Query full check detail of migration task + * + * @param fullCheckParam fullCheckParam + * @return result + */ + @PostMapping(value = "/query/full/check/detail") + public AjaxResult queryFullCheckDetail(@RequestBody FullCheckParam fullCheckParam) { + return AjaxResult.success(migrationTaskService.queryFullCheckDetailOfMigrationTask(fullCheckParam)); + } + + /** + * Download repair file + * + * @param id task id + * @param repairFileName repair file name + * @param response response + * @throws IOException IOException + */ + @GetMapping(value = "/download/repair/file/{id}/{repairFileName}") + public void downloadRepairFile(@PathVariable Integer id, @PathVariable String repairFileName, + HttpServletResponse response) throws IOException { + String content = migrationTaskService.downloadRepairFile(id, repairFileName); + if (StringUtils.isBlank(content)) { + content = " "; + } + byte[] bytes = content.getBytes(StandardCharsets.UTF_8); + response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE); + FileUtils.setAttachmentResponseHeader(response, repairFileName); + try (OutputStream output = new BufferedOutputStream(response.getOutputStream())) { + output.write(bytes); + output.flush(); + } + } } diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationHostPortalInstall.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationHostPortalInstall.java index fa2265622..0b7cee788 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationHostPortalInstall.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationHostPortalInstall.java @@ -123,4 +123,13 @@ public class MigrationHostPortalInstall { } }).map(mainVersion -> mainVersion > version).orElse(false); } + + /** + * get portal install root path + * + * @return String + */ + public String getInstallRootPath() { + return installPath + "portal/"; + } } diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressDetail.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressDetail.java new file mode 100644 index 000000000..6c2914e41 --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressDetail.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressDetail.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressDetail.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.domain; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; + +import lombok.Builder; +import lombok.Data; +import lombok.experimental.Tolerate; + +import java.util.Date; + +/** + * MigrationTaskCheckProgressDetail + * + * @author wangchao + * @date 2025/01/14 09:01 + **/ +@Data +@Builder +@TableName("tb_migration_task_check_progress_detail") +public class MigrationTaskCheckProgressDetail { + private String id; + private Integer taskId; + private String schemaName; + private String sourceName; + private String sinkName; + private String status; + private Integer failedRows; + private String message; + private String repairFileName; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + @Tolerate + public MigrationTaskCheckProgressDetail() { + } +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressSummary.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressSummary.java new file mode 100644 index 000000000..a040cba90 --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressSummary.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressSummary.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressSummary.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.domain; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; + +import lombok.Builder; +import lombok.Data; +import lombok.experimental.Tolerate; + +import java.time.LocalDateTime; + +/** + * MigrationTaskCheckProgressSummary + * + * @author wangchao + * @date 2025/01/14 09:01 + **/ +@Data +@Builder +@TableName("tb_migration_task_check_progress_summary") +public class MigrationTaskCheckProgressSummary { + /** + * id + */ + @TableId(value = "id", type = IdType.AUTO) + private Integer id; + private Integer taskId; + private String sourceDb; + private String sinkDb; + private Long total; + private Integer avgSpeed; + @TableField("completed") + private Integer completeCount; + private Integer tableCount; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime startTime; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime endTime; + private String status; + + @Tolerate + public MigrationTaskCheckProgressSummary() { + } +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/enums/TaskStatus.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/enums/TaskStatus.java index f8cf3d7ac..79d6d159d 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/enums/TaskStatus.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/enums/TaskStatus.java @@ -42,6 +42,7 @@ public enum TaskStatus { FULL_CHECKING(5, "full_checking"), FULL_CHECK_FINISH(6, "full_check_finish"), INCREMENTAL_START(7, "incremental_start"), + INCREMENTAL_PAUSE(30, "incremental_pause"), INCREMENTAL_RUNNING(8, "incremental_run"), // when click stop incremental INCREMENTAL_FINISHED(9, "incremental_finished"), @@ -50,6 +51,7 @@ public enum TaskStatus { REVERSE_START(11, "reverse_start"), REVERSE_RUNNING(12, "reverse_run"), REVERSE_STOP(13, "reverse_stop"), + REVERSE_PAUSE(40, "reverse_pause"), MIGRATION_FINISH(100, "migration_finish"), MIGRATION_ERROR(500, "error"), WAIT_RESOURCE(1000, "wait_resource"), @@ -73,9 +75,16 @@ public enum TaskStatus { return command; } - public String getCommandByCode(Integer codeType) { + /** + * get command by code + * + * @param codeType command type + * @return command + */ + public static String getCommandByCode(Integer codeType) { Optional first = Arrays.stream(TaskStatus.values()) - .filter(x -> x.getCode().equals(codeType)).findFirst(); + .filter(x -> x.getCode().equals(codeType)) + .findFirst(); if (first != null) { return first.get().getCommand(); } diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/handler/MigrationRecoveryHandler.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/handler/MigrationRecoveryHandler.java new file mode 100644 index 000000000..450d8995f --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/handler/MigrationRecoveryHandler.java @@ -0,0 +1,452 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationRecoveryHandler.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/handler/MigrationRecoveryHandler.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.handler; + +import com.gitee.starblues.bootstrap.annotation.AutowiredType; + +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; + +import org.opengauss.admin.common.constant.CommonConstants; +import org.opengauss.admin.common.utils.OpsAssert; +import org.opengauss.admin.plugin.domain.MigrationHostPortalInstall; +import org.opengauss.admin.plugin.domain.MigrationTask; +import org.opengauss.admin.plugin.vo.ProcessStatus; +import org.opengauss.admin.system.plugin.beans.SshLogin; +import org.opengauss.admin.system.plugin.facade.JschExecutorFacade; +import org.opengauss.admin.system.service.ops.impl.EncryptionUtils; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import javax.annotation.Resource; + +/** + * MigrationRecoveryHandler + * + * @author: wangchao + * @Date: 2024/12/28 15:23 + * @since 7.0.0 + **/ +@Slf4j +@Component +public class MigrationRecoveryHandler { + /** + * 使用正则表达式,格式化字符串中多个空格,为一个空格 + */ + private static final Pattern SPACE_PATTERN = Pattern.compile("\\s{2,}"); + private static final List HISTORY_PORTAL_VERSION = Arrays.asList("6.0.0rc1", "6.0.0", "5.0.0", "5.1.0"); + private static final String FETCH_PROCESS_COMMAND = "ps -ux | grep java | grep %s | grep -E 'Dworkspace.id=%s|%s'"; + private static final String NAME_PORTAL_CONTROL = "portal-control"; + private static final String NAME_MIGRATION_INCREMENTAL_SOURCE = "source"; + private static final String NAME_MIGRATION_INCREMENTAL_SINK = "sink"; + private static final String NAME_MIGRATION_ONLINE_RESET = "reset"; + private static final String NAME_MIGRATION_REVERSE_SOURCE = "reverse-source"; + private static final String NAME_MIGRATION_REVERSE_SINK = "reverse-sink"; + private static final String NAME_MIGRATION_REVERSE_RESET = "reverse-reset"; + private static final String NAME_MIGRATION_CHECK = "check"; + private static final String NAME_MIGRATION_CHECK_SOURCE = "check-source"; + private static final String NAME_MIGRATION_CHECK_SINK = "check-sink"; + private static final Map PROCESS_CMD_KEYWORD_FOR_NAME = new HashMap<>(); + + static { + PROCESS_CMD_KEYWORD_FOR_NAME.put("portalControl", NAME_PORTAL_CONTROL); + PROCESS_CMD_KEYWORD_FOR_NAME.put("mysql-source.properties", NAME_MIGRATION_INCREMENTAL_SOURCE); + PROCESS_CMD_KEYWORD_FOR_NAME.put("mysql-sink.properties", NAME_MIGRATION_INCREMENTAL_SINK); + PROCESS_CMD_KEYWORD_FOR_NAME.put("opengauss-source.properties", NAME_MIGRATION_REVERSE_SOURCE); + PROCESS_CMD_KEYWORD_FOR_NAME.put("opengauss-sink.properties", NAME_MIGRATION_REVERSE_SINK); + PROCESS_CMD_KEYWORD_FOR_NAME.put("application.yml", NAME_MIGRATION_CHECK); + PROCESS_CMD_KEYWORD_FOR_NAME.put("application-source.yml", NAME_MIGRATION_CHECK_SOURCE); + PROCESS_CMD_KEYWORD_FOR_NAME.put("application-sink.yml", NAME_MIGRATION_CHECK_SINK); + } + + @Resource + @AutowiredType(AutowiredType.Type.PLUGIN_MAIN) + private JschExecutorFacade jschExecutorFacade; + @Resource + @AutowiredType(AutowiredType.Type.PLUGIN_MAIN) + private EncryptionUtils encryptionUtils; + + /** + * fetch process status list by migration task. + * + * @param migrationTask migration task + * @param installPath install path + * @return process status list + */ + public Map fetchProcessStatusListByMigrationTask(MigrationTask migrationTask, + String installPath) { + String condition = installPath + "workspace/" + migrationTask.getId() + "/config"; + String command = String.format(FETCH_PROCESS_COMMAND, installPath, migrationTask.getId(), condition); + SshLogin sshLogin = buildSshLogin(migrationTask); + List processStatusList = fetchProcessStatusList(sshLogin, command); + for (ProcessStatus processStatus : processStatusList) { + processStatus.setName(parseProcessNameByExecCmd(processStatus.getCmd())); + } + return processStatusList.stream() + .collect(Collectors.toMap(ProcessStatus::getName, Function.identity(), (oldValue, newValue) -> newValue)); + } + + private String parseProcessNameByExecCmd(String cmd) { + String name = "other"; // Default name + for (Map.Entry entry : PROCESS_CMD_KEYWORD_FOR_NAME.entrySet()) { + if (cmd.contains(entry.getKey())) { + name = entry.getValue(); + break; + } + } + return name; + } + + /** + * exec ssh command of {@value FETCH_PROCESS_COMMAND} + * + * @param sshLogin sshLogin + * @param command command + * @return process status info + */ + private List fetchProcessStatusList(SshLogin sshLogin, String command) { + List processStatusList = new ArrayList<>(); + String result = jschExecutorFacade.execCommand(sshLogin, command); + String[] lines = result.split(CommonConstants.LINE_SPLITTER); + String lineFormat; + for (String line : lines) { + if (line.contains("bash -c ps")) { + continue; + } + Matcher matcher = SPACE_PATTERN.matcher(line); + lineFormat = matcher.replaceAll(" "); + String[] res = lineFormat.split(" "); + ProcessStatus processStatus = new ProcessStatus(res[0], res[1], + StrUtil.join(" ", (Object) ArrayUtil.sub(res, 10, res.length))); + processStatusList.add(processStatus); + } + return processStatusList; + } + + private SshLogin buildSshLogin(MigrationTask migrationTask) { + return new SshLogin(migrationTask.getRunHost(), migrationTask.getRunPort(), migrationTask.getRunUser(), + encryptionUtils.decrypt(migrationTask.getRunPass())); + } + + /** + * check if the portal process is running + * + * @param processMap process map + * @return true if the process is running + */ + public boolean hasPortal(Map processMap) { + return processMap.containsKey(NAME_PORTAL_CONTROL); + } + + /** + * check if the incremental process is running + * + * @param processMap process map + * @return true if the process is running + */ + public boolean hasIncrementalSource(Map processMap) { + return processMap.containsKey(NAME_MIGRATION_INCREMENTAL_SOURCE); + } + + /** + * check if the incremental process is running + * + * @param processMap process map + * @return true if the process is running + */ + public boolean hasIncrementalSink(Map processMap) { + return processMap.containsKey(NAME_MIGRATION_INCREMENTAL_SINK); + } + + /** + * check if the reverse process is running + * + * @param processMap process map + * @return true if the process is running + */ + public boolean hasReverseSource(Map processMap) { + return processMap.containsKey(NAME_MIGRATION_REVERSE_SOURCE); + } + + /** + * check if the reverse process is running + * + * @param processMap process map + * @return true if the process is running + */ + public boolean hasReverseSink(Map processMap) { + return processMap.containsKey(NAME_MIGRATION_REVERSE_SINK); + } + + /** + * start the incremental process + * + * @param processMap process map + * @param migrationTask migration task + * @param portalInstall portal install + * @param name process name + */ + public void startProcessOfIncrementalMigration(MigrationTask migrationTask, + MigrationHostPortalInstall portalInstall, String name) { + SshLogin sshLogin = buildSshLogin(migrationTask); + Integer id = migrationTask.getId(); + String installPath = portalInstall.getInstallRootPath(); + MigrationCommand migrationCommand = new MigrationCommand(installPath, portalInstall.getJarName(), id); + OpsAssert.isTrue(migrationCommand.checkIncrementalCommand(name), "Incremental Command is not supported"); + String execResult; + Map processMap; + if (StrUtil.equalsIgnoreCase(NAME_MIGRATION_ONLINE_RESET, name)) { + String cmdOfStopIncrement = migrationCommand.builder(MigrationCommand.STOP_INCREMENTAL); + execResult = jschExecutorFacade.execCommand(sshLogin, cmdOfStopIncrement); + log.info("stop incremental process of migration task {}.", cmdOfStopIncrement); + log.info("stop incremental process of migration task {}.", execResult); + ThreadUtil.safeSleep(1000); + processMap = fetchProcessStatusListByMigrationTask(migrationTask, installPath); + while (processMap.size() > 1) { + ThreadUtil.safeSleep(500); + processMap = fetchProcessStatusListByMigrationTask(migrationTask, installPath); + } + String cmdOfRunIncrement = migrationCommand.builder(MigrationCommand.RUN_INCREMENTAL); + execResult = jschExecutorFacade.execCommand(sshLogin, cmdOfRunIncrement); + log.info("start incremental process of migration task {}.", cmdOfRunIncrement); + log.info("start incremental process of migration task {}.", execResult); + } else { + String command = migrationCommand.builder(name); + execResult = jschExecutorFacade.execCommand(sshLogin, command); + log.info("start incremental process of migration task {}.", command); + log.info("start incremental process of migration task {}.", execResult); + } + processMap = fetchProcessStatusListByMigrationTask(migrationTask, installPath); + int checkTimes = 0; + while (processMap.size() < 7 && checkTimes < 10) { + ThreadUtil.safeSleep(500); + processMap = fetchProcessStatusListByMigrationTask(migrationTask, installPath); + checkTimes++; + } + } + + /** + * start the reverse process + * + * @param migrationTask migration task + * @param portalInstall portal install + * @param name process name + */ + public void startProcessOfReverseMigration(MigrationTask migrationTask, MigrationHostPortalInstall portalInstall, + String name) { + SshLogin sshLogin = buildSshLogin(migrationTask); + Integer id = migrationTask.getId(); + String installPath = portalInstall.getInstallRootPath(); + MigrationCommand migrationCommand = new MigrationCommand(installPath, portalInstall.getJarName(), id); + OpsAssert.isTrue(migrationCommand.checkReverseCommand(name), "reverse Command is not supported"); + String execResult; + if (StrUtil.equalsIgnoreCase(NAME_MIGRATION_REVERSE_RESET, name)) { + String cmdOfStopReverse = migrationCommand.builder(MigrationCommand.STOP_REVERSE); + execResult = jschExecutorFacade.execCommand(sshLogin, cmdOfStopReverse); + log.info("stop reverse process of migration task {}", cmdOfStopReverse); + log.info("stop reverse process of migration task {}", execResult); + Map processMap = fetchProcessStatusListByMigrationTask(migrationTask, installPath); + while (processMap.size() > 1) { + ThreadUtil.safeSleep(500); + processMap = fetchProcessStatusListByMigrationTask(migrationTask, installPath); + } + String cmdOfRunReverse = migrationCommand.builder(MigrationCommand.RUN_REVERSE); + execResult = jschExecutorFacade.execCommand(sshLogin, cmdOfRunReverse); + log.info("run reverse process of migration task {}.", cmdOfRunReverse); + log.info("run reverse process of migration task {}.", execResult); + } else { + String command = migrationCommand.builder(name); + execResult = jschExecutorFacade.execCommand(sshLogin, command); + log.info("run reverse process of migration task {}.", command); + log.info("run reverse process of migration task {}.", execResult); + } + } + + /** + * get portal jar version + * + * @param jarName jar name + * @return jar version + */ + public String getJarVersion(String jarName) { + return jarName.replace("portalControl-", "").replace("-exec.jar", ""); + } + + /** + * check portal version is history version ,when it not supports breakRecovery feature + * + * @param version version + * @return boolean + */ + public boolean checkPortalVersion(String version) { + return HISTORY_PORTAL_VERSION.contains(version); + } +} + +class MigrationCommand { + private static final String INCREMENTAL_MIGRATION_STOP = "stop_incremental_migration"; + private static final String INCREMENTAL_MIGRATION_RUN = "run_incremental_migration"; + private static final String INCREMENTAL_MIGRATION_SOURCE = "run_incremental_migration_source"; + private static final String INCREMENTAL_MIGRATION_SINK = "run_incremental_migration_sink"; + private static final String REVERSE_MIGRATION_STOP = "stop_reverse_migration"; + private static final String REVERSE_MIGRATION_RUN = "run_reverse_migration"; + private static final String REVERSE_MIGRATION_SOURCE = "run_reverse_migration_source"; + private static final String REVERSE_MIGRATION_SINK = "run_reverse_migration_sink"; + private static final String CMD_PORTAL_TEMP = "java -Dpath=%s -Dorder=%s -Dskip=true -Dworkspace.id=%s -jar %s"; + + private static final Map MIGRATION_PROPERTIES = new HashMap<>(); + + /** + * incremental process command RUN_INCREMENTAL_SOURCE + */ + public static final String RUN_INCREMENTAL_SOURCE = "source"; + + /** + * incremental process command RUN_INCREMENTAL_SINK + */ + public static final String RUN_INCREMENTAL_SINK = "sink"; + + /** + * incremental process command RUN_INCREMENTAL + */ + public static final String RUN_INCREMENTAL = "run"; + + /** + * incremental process command STOP_INCREMENTAL + */ + public static final String STOP_INCREMENTAL = "stop"; + + /** + * incremental process command RESET_INCREMENTAL + */ + public static final String RESET_INCREMENTAL = "reset"; + + /** + * reverse process command RUN_REVERSE_SOURCE + */ + public static final String RUN_REVERSE_SOURCE = "reverse-source"; + + /** + * reverse process command RUN_REVERSE_SINK + */ + public static final String RUN_REVERSE_SINK = "reverse-sink"; + + /** + * reverse process command RUN_REVERSE + */ + public static final String RUN_REVERSE = "reverse-run"; + + /** + * reverse process command STOP_REVERSE + */ + public static final String STOP_REVERSE = "reverse-stop"; + + /** + * reverse process command RESET_REVERSE + */ + public static final String RESET_REVERSE = "reverse-reset"; + + /** + * incremental process command REVERSE_COMMANDS + */ + public static final List REVERSE_COMMANDS = Arrays.asList(RUN_REVERSE_SOURCE, RUN_REVERSE_SINK, RUN_REVERSE, + STOP_REVERSE, RESET_REVERSE); + + /** + * incremental process command INCREMENTAL_COMMANDS + */ + public static final List INCREMENTAL_COMMANDS = Arrays.asList(RUN_INCREMENTAL_SOURCE, RUN_INCREMENTAL_SINK, + RUN_INCREMENTAL, STOP_INCREMENTAL, RESET_INCREMENTAL); + + static { + MIGRATION_PROPERTIES.put(STOP_INCREMENTAL, INCREMENTAL_MIGRATION_STOP); + MIGRATION_PROPERTIES.put(RUN_INCREMENTAL, INCREMENTAL_MIGRATION_RUN); + MIGRATION_PROPERTIES.put(RUN_INCREMENTAL_SOURCE, INCREMENTAL_MIGRATION_SOURCE); + MIGRATION_PROPERTIES.put(RUN_INCREMENTAL_SINK, INCREMENTAL_MIGRATION_SINK); + MIGRATION_PROPERTIES.put(STOP_REVERSE, REVERSE_MIGRATION_STOP); + MIGRATION_PROPERTIES.put(RUN_REVERSE, REVERSE_MIGRATION_RUN); + MIGRATION_PROPERTIES.put(RUN_REVERSE_SOURCE, REVERSE_MIGRATION_SOURCE); + MIGRATION_PROPERTIES.put(RUN_REVERSE_SINK, REVERSE_MIGRATION_SINK); + } + + private String installPath; + private String jarName; + private Integer workspaceId; + + /** + * construct migration run command + * + * @param installPath install path + * @param jarName jar name + * @param workspaceId workspace id + */ + public MigrationCommand(String installPath, String jarName, Integer workspaceId) { + this.installPath = installPath; + this.jarName = jarName; + this.workspaceId = workspaceId; + } + + /** + * check migration incremental command + * + * @param name name + * @return boolean + */ + public boolean checkIncrementalCommand(String name) { + return INCREMENTAL_COMMANDS.contains(name); + } + + /** + * check migration reverse command + * + * @param name name + * @return boolean + */ + public boolean checkReverseCommand(String name) { + return REVERSE_COMMANDS.contains(name); + } + + /** + * build migration command + * + * @param name name + * @return command + */ + public String builder(String name) { + OpsAssert.isTrue(MIGRATION_PROPERTIES.containsKey(name), "unknown migration command"); + String order = MIGRATION_PROPERTIES.get(name); + return String.format(CMD_PORTAL_TEMP, installPath, order, workspaceId, installPath + jarName); + } +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressDetailMapper.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressDetailMapper.java new file mode 100644 index 000000000..f4e1bd645 --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressDetailMapper.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressDetailMapper.java + * + * IDENTIFICATION + * data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressDetailMapper.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +import org.apache.ibatis.annotations.Mapper; +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressDetail; + +/** + * MigrationTaskCheckProgressDetailMapper + * + * @author wangchao + * @date 2025/01/14 09:01 + */ +@Mapper +public interface MigrationTaskCheckProgressDetailMapper extends BaseMapper { } \ No newline at end of file diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressSummaryMapper.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressSummaryMapper.java new file mode 100644 index 000000000..30a6104ae --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressSummaryMapper.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressSummaryMapper.java + * + * IDENTIFICATION + * data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressSummaryMapper.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +import org.apache.ibatis.annotations.Mapper; +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressSummary; + +/** + * MigrationTaskCheckProgressSummaryMapper + * + * @author wangchao + * @date 2025/01/14 09:01 + */ +@Mapper +public interface MigrationTaskCheckProgressSummaryMapper extends BaseMapper { } \ No newline at end of file diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressDetailService.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressDetailService.java new file mode 100644 index 000000000..0d4bdf6f7 --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressDetailService.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressDetailService.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressDetailService.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.service; + +import com.baomidou.mybatisplus.extension.service.IService; + +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressDetail; + +import java.util.List; + +/** + * MigrationTaskCheckProgressDetailService + * + * @author wangchao + * @date 2025/01/14 09:01 + */ +public interface MigrationTaskCheckProgressDetailService extends IService { + /** + * remove by task id + * + * @param ids ids + */ + void removeByTaskIds(List ids); +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressSummaryService.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressSummaryService.java new file mode 100644 index 000000000..645da3d2f --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressSummaryService.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressSummaryService.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressSummaryService.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.service; + +import com.baomidou.mybatisplus.extension.service.IService; + +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressSummary; + +import java.util.List; + +/** + * MigrationTaskCheckProgressSummaryService + * + * @author wangchao + * @date 2025/01/14 09:01 + */ +public interface MigrationTaskCheckProgressSummaryService extends IService { + /** + * Query the summary of the check progress of the specified task + * + * @param taskId task id + * @return MigrationTaskCheckProgressSummary + */ + MigrationTaskCheckProgressSummary queryFullCheckSummaryOfMigrationTask(Integer taskId); + + /** + * remove the summary of the check progress of the specified task + * + * @param ids ids + */ + void removeByTaskIds(List ids); +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskService.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskService.java index 6682aa905..a29f6e859 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskService.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskService.java @@ -28,9 +28,13 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; import org.opengauss.admin.plugin.domain.MigrationHostPortalInstall; import org.opengauss.admin.plugin.domain.MigrationTask; +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressDetail; +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressSummary; import org.opengauss.admin.plugin.domain.MigrationTaskGlobalParam; import org.opengauss.admin.plugin.domain.MigrationTaskHostRef; import org.opengauss.admin.plugin.enums.TaskStatus; +import org.opengauss.admin.plugin.vo.FullCheckParam; +import org.opengauss.admin.plugin.vo.TaskProcessStatus; import java.util.List; import java.util.Map; @@ -40,6 +44,10 @@ import java.util.Map; * @date 2023/01/14 09:01 */ public interface MigrationTaskService extends IService { + /** + * Initialize the migration task check progress monitor. + */ + void initMigrationTaskCheckProgressMonitor(); IPage selectList(IPage page, Integer mainTaskId); @@ -120,10 +128,12 @@ public interface MigrationTaskService extends IService { * run task * * @param h MigrationTaskHostRef Object - * @param t MigrationTask Object - * @param globalParams + * @param t MigrationTask Object + * @param globalParams globalParams + * @param operateUsername operateUsername */ - void runTask(MigrationTaskHostRef h, MigrationTask t, List globalParams); + void runTask(MigrationTaskHostRef h, MigrationTask t, List globalParams, + String operateUsername); /** * subtask Execution Offline Scheduler @@ -141,4 +151,46 @@ public interface MigrationTaskService extends IService { */ boolean execMigrationCheck(MigrationHostPortalInstall installHost, MigrationTask t, List globalParams, String command); + + /** + * check status of incremental or reverse migration task + * + * @param id task id + * @return status + */ + TaskProcessStatus checkStatusOfIncrementalOrReverseMigrationTask(Integer id); + + /** + * start incremental or reverse migration task + * + * @param id id + * @param name name + * @return status + */ + TaskProcessStatus startTaskOfIncrementalOrReverseMigrationProcess(Integer id, String name); + + /** + * query full check summary of migration task + * + * @param id id + * @return summary + */ + MigrationTaskCheckProgressSummary queryFullCheckSummaryOfMigrationTask(Integer id); + + /** + * query full check detail of migration task + * + * @param fullCheckParam fullCheckParam + * @return detail + */ + IPage queryFullCheckDetailOfMigrationTask(FullCheckParam fullCheckParam); + + /** + * download repair file + * + * @param id id + * @param repairFileName repairFileName + * @return file context + */ + String downloadRepairFile(Integer id, String repairFileName); } diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationMainTaskServiceImpl.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationMainTaskServiceImpl.java index 1b0bec074..39c005ed3 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationMainTaskServiceImpl.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationMainTaskServiceImpl.java @@ -391,39 +391,45 @@ public class MigrationMainTaskServiceImpl extends ServiceImpl tasks = migrationTaskService.listByMainTaskId(id); List hosts = migrationTaskHostRefService.listByMainTaskId(id); List globalParams = migrationTaskGlobalParamService.selectByMainTaskId(id); - Integer totalRunnableCount = hosts.stream().mapToInt(MigrationTaskHostRef::getRunnableCount).sum(); - if (totalRunnableCount > tasks.size()) { //The number of host executable tasks is greater than the total number of tasks + int totalRunnableCount = hosts.stream().mapToInt(MigrationTaskHostRef::getRunnableCount).sum(); + String operateUsername = SecurityUtils.getUsername(); + if (totalRunnableCount > tasks.size()) { + // The number of host executable tasks is greater than the total number of tasks for (int x = 0; x < tasks.size(); x++) { - int size = hosts.stream().filter(h -> h.getRunnableCount() > 0).collect(Collectors.toList()).size(); + int size = (int) hosts.stream().filter(h -> h.getRunnableCount() > 0).count(); int curHostIndex = x % size; MigrationTask t = tasks.get(x); t.setOrderInvokedTimestamp(startTaskTimestamp); MigrationTaskHostRef h = hosts.get(curHostIndex); h.addPlaceHolderCount(); - threadPoolTaskExecutor.submit(() -> migrationTaskService.runTask(h, t, globalParams)); + threadPoolTaskExecutor.submit(() -> migrationTaskService.runTask(h, t, globalParams, operateUsername)); } } else { //The number of tasks that the host can perform is less than the total number of tasks - hosts.stream().forEach(h -> { + hosts.forEach(h -> { Integer runnableCount = h.getRunnableCount(); for (int i = tasks.size() - 1; i >= tasks.size() - runnableCount; i--) { MigrationTask t = tasks.get(i); t.setOrderInvokedTimestamp(startTaskTimestamp); - threadPoolTaskExecutor.submit(() -> migrationTaskService.runTask(h, t, globalParams)); + threadPoolTaskExecutor.submit( + () -> migrationTaskService.runTask(h, t, globalParams, operateUsername)); tasks.remove(i); } }); //For tasks that have not been assigned a host, // change the status to 1000 and be processed by the offline scheduler - tasks.stream().forEach(t -> { + tasks.forEach(t -> { migrationTaskService.updateStatus(t.getId(), TaskStatus.WAIT_RESOURCE); }); } diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressDetailServiceImpl.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressDetailServiceImpl.java new file mode 100644 index 000000000..11d5242f2 --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressDetailServiceImpl.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressDetailServiceImpl.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl + * /MigrationTaskCheckProgressDetailServiceImpl.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; + +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressDetail; +import org.opengauss.admin.plugin.mapper.MigrationTaskCheckProgressDetailMapper; +import org.opengauss.admin.plugin.service.MigrationTaskCheckProgressDetailService; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * MigrationTaskCheckProgressDetailService + * + * @author: wangchao + * @Date: 2024/12/30 09:53 + * @Description: MigrationTaskCheckProgressService + * @since 7.0.0 + **/ +@Service +public class MigrationTaskCheckProgressDetailServiceImpl + extends ServiceImpl + implements MigrationTaskCheckProgressDetailService { + @Override + public void removeByTaskIds(List ids) { + LambdaQueryWrapper query = Wrappers.lambdaQuery( + MigrationTaskCheckProgressDetail.class); + query.in(MigrationTaskCheckProgressDetail::getTaskId, ids); + remove(query); + } +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressMonitor.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressMonitor.java new file mode 100644 index 000000000..f0abdd0a9 --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressMonitor.java @@ -0,0 +1,234 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressMonitor.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressMonitor.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.service.impl; + +import static org.opengauss.admin.plugin.enums.TaskStatus.FULL_CHECK_FINISH; +import static org.opengauss.admin.plugin.enums.TaskStatus.FULL_CHECK_START; + +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; + +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.core.util.StrUtil; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.jetbrains.annotations.NotNull; +import org.opengauss.admin.common.core.domain.model.ops.JschResult; +import org.opengauss.admin.plugin.context.MigrationTaskContext; +import org.opengauss.admin.plugin.domain.MigrationHostPortalInstall; +import org.opengauss.admin.plugin.domain.MigrationTask; +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressDetail; +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressSummary; +import org.opengauss.admin.plugin.handler.PortalHandle; +import org.opengauss.admin.plugin.service.MigrationTaskCheckProgressDetailService; +import org.opengauss.admin.plugin.service.MigrationTaskCheckProgressSummaryService; +import org.opengauss.admin.plugin.utils.ShellUtil; +import org.opengauss.admin.system.service.ops.impl.EncryptionUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * MigrationTaskCheckProgressMonitor + * @author: wangchao + * @Date: 2024/12/31 10:07 + * @Description: MigrationTaskCheckProgressMonitor + * + * @since 7.0.0 + **/ +@Slf4j +public class MigrationTaskCheckProgressMonitor implements Runnable { + private boolean isRunning = true; + private boolean isCompleted = false; + private final Integer taskId; + private Integer execStatus; + private final String installPath; + private final MigrationHostPortalInstall installHost; + private final EncryptionUtils encryptionUtils; + private final MigrationTask migrationTask; + private final MigrationTaskCheckProgressSummaryService summaryService; + private final MigrationTaskCheckProgressDetailService detailService; + + MigrationTaskCheckProgressMonitor(MigrationTaskContext context) { + this.taskId = context.getId(); + this.migrationTask = context.getMigrationTask(); + this.encryptionUtils = context.getEncryptionUtils(); + this.summaryService = context.getMigrationTaskCheckProgressSummaryService(); + this.detailService = context.getMigrationTaskCheckProgressDetailService(); + this.installHost = context.getInstallHost(); + this.installPath = installHost.getInstallPath(); + this.execStatus = migrationTask.getExecStatus(); + } + + @Override + public void run() { + Thread.currentThread().setName("full-check-progress-monitor-" + taskId); + while (isRunning) { + try { + // when migration task status start full check ,start refresh check result + // if migration task status init status is bigger than full check start status , + // updated task check result,and completed monitor thread. + if (execStatus >= FULL_CHECK_START.getCode()) { + parseFullCheckProgressDetail(); + parseFullCheckProgress(); + } + // when migration task status finish full check , stop refresh check result + if (execStatus > FULL_CHECK_FINISH.getCode()) { + isCompleted = true; + } + ThreadUtil.safeSleep(1000); + } catch (Exception exc) { + log.error("MigrationTaskCheckProgressMonitor parse error ,subTaskId={} ", taskId, exc); + } + } + } + + private void parseFullCheckProgressDetail() { + String password = encryptionUtils.decrypt(installHost.getRunPassword()); + String successCmd = "cat " + installPath + "portal/workspace/" + taskId + "/check_result/result/success.log"; + String failedCmd = "cat " + installPath + "portal/workspace/" + taskId + "/check_result/result/failed.log"; + Map commands = new HashMap<>(); + Map checkResultLogStatus = new HashMap<>(); + commands.put("success", successCmd); + commands.put("failed", failedCmd); + commands.forEach((key, value) -> { + JschResult jschResult = ShellUtil.execCommandGetResult(installHost.getHost(), installHost.getPort(), + installHost.getRunUser(), password, value); + String resultMsg = getResultMessage(jschResult); + if (jschResult.isOk()) { + String message = PortalHandle.replaceAllBlank(resultMsg); + if (StrUtil.isNotEmpty(message)) { + if (message.endsWith(",")) { + message = "[" + message.substring(0, message.length() - 1) + "]"; + List detailList = JSONObject.parseArray(message, DataCheckVo.class); + List details = detailList.stream().map(item -> { + MigrationTaskCheckProgressDetail tableMsg = new MigrationTaskCheckProgressDetail(); + tableMsg.setId(taskId + "_" + item.getTable()); + tableMsg.setTaskId(taskId); + tableMsg.setSchemaName(item.getSchema()); + tableMsg.setSourceName(item.getTable()); + tableMsg.setSinkName(item.getTable()); + tableMsg.setStatus(key); + tableMsg.setMessage(item.getMessage()); + tableMsg.setFailedRows(item.getDiffCount()); + tableMsg.setRepairFileName(getFailedTableRepairName(key, item)); + return tableMsg; + }).collect(Collectors.toList()); + detailService.saveOrUpdateBatch(details, 100); + checkResultLogStatus.put(key, "success"); + log.info("refresh data check detail information: subTaskId: {} {}", detailList.size(), taskId); + } else { + log.warn("fetch data check detail information is invalid , subTaskId: {}", taskId); + } + } else { + log.warn("fetch data check detail information empty , subTaskId: {}", taskId); + } + } else { + checkResultLogStatus.put(key, "failed"); + log.warn("refresh data check detail information failed, subTaskId: {} {}", taskId, resultMsg); + } + }); + } + + private String getFailedTableRepairName(String key, DataCheckVo item) { + if (key.equals("failed") && item.diffCount > 0) { + return "repair_" + item.getSchema() + "_" + item.getTable() + "_" + "0_1.txt"; + } + return ""; + } + + @NotNull + private static String getResultMessage(JschResult jschResult) { + String result = jschResult.getResult(); + return StrUtil.isNotEmpty(result) ? result.trim() : ""; + } + + private void parseFullCheckProgress() { + String password = encryptionUtils.decrypt(installHost.getRunPassword()); + String summaryCmd = "grep '{.*}' " + installPath + "portal/workspace/" + taskId + + "/status/full_migration_datacheck.txt | tail -n 1"; + JschResult result = ShellUtil.execCommandGetResult(installHost.getHost(), installHost.getPort(), + installHost.getRunUser(), password, summaryCmd); + String resultMsg = getResultMessage(result); + if (result.isOk()) { + String portalDataCheckProcess = PortalHandle.replaceAllBlank(resultMsg); + if (StrUtil.isEmpty(portalDataCheckProcess)) { + return; + } + MigrationTaskCheckProgressSummary summary = JSONObject.parseObject(portalDataCheckProcess, + MigrationTaskCheckProgressSummary.class); + summary.setTaskId(taskId); + summary.setSourceDb(migrationTask.getSourceDb()); + summary.setSinkDb(migrationTask.getTargetDb()); + LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate( + MigrationTaskCheckProgressSummary.class); + updateWrapper.eq(MigrationTaskCheckProgressSummary::getTaskId, taskId); + summaryService.saveOrUpdate(summary, updateWrapper); + log.info("refresh data check summary information, subTaskId: {} ", taskId); + } else { + log.warn("refresh data check summary information failed, subTaskId: {} {}", taskId, resultMsg); + } + } + + /** + * stop progress monitor + */ + public synchronized void stop() { + isRunning = false; + log.info("migration check progress monitor will be stop, taskId: {}", taskId); + } + + /** + * check task completed + * + * @return boolean + */ + public synchronized boolean isCheckTaskCompleted() { + return isCompleted; + } + + /** + * refresh migration task status + * + * @param execStatus execution status + */ + public void refresh(Integer execStatus) { + this.execStatus = execStatus; + } + + /** + * DataCheckVo + */ + @Data + static class DataCheckVo { + private String schema; + private String table; + private String message; + private Integer diffCount; + } +} \ No newline at end of file diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressService.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressService.java new file mode 100644 index 000000000..52f7bd414 --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressService.java @@ -0,0 +1,186 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressService.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressService.java + * + * ------------------------------------------------------------------------- + */ + + +package org.opengauss.admin.plugin.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.gitee.starblues.bootstrap.annotation.AutowiredType; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; + +import org.opengauss.admin.plugin.context.MigrationTaskContext; +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressDetail; +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressSummary; +import org.opengauss.admin.plugin.service.MigrationTaskCheckProgressDetailService; +import org.opengauss.admin.plugin.service.MigrationTaskCheckProgressSummaryService; +import org.opengauss.admin.system.service.ops.impl.EncryptionUtils; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.annotation.Resource; + +/** + * MigrationTaskCheckProgressService + * @author: wangchao + * @Date: 2024/12/30 09:53 + * @Description: MigrationTaskCheckProgressService + * + * @since 7.0.0 + **/ +@Service +@Slf4j +public class MigrationTaskCheckProgressService { + private static final Map MONITOR_TASK = new ConcurrentHashMap<>(); + private static final Map> MONITOR_TASK_FUTURE = new ConcurrentHashMap<>(); + private static final int DEFAULT_PAGE_SIZE = 5; + private static final int DEFAULT_PAGE_NUM = 1; + + @Resource + private ThreadPoolTaskExecutor threadPoolTaskExecutor; + @Resource + private MigrationTaskCheckProgressSummaryService migrationTaskCheckProgressSummaryService; + @Resource + private MigrationTaskCheckProgressDetailService migrationTaskCheckProgressDetailService; + @Resource + @AutowiredType(AutowiredType.Type.PLUGIN_MAIN) + private EncryptionUtils encryptionUtils; + + /** + * refreshCheckProgress + * + * @param taskId task id + * @param execStatus exec status + */ + public void refreshCheckProgress(Integer taskId, Integer execStatus) { + if (MONITOR_TASK.containsKey(taskId)) { + MigrationTaskCheckProgressMonitor monitor = MONITOR_TASK.get(taskId); + monitor.refresh(execStatus); + if (monitor.isCheckTaskCompleted()) { + stopByTaskIds(List.of(taskId)); + MONITOR_TASK.remove(taskId); + MONITOR_TASK_FUTURE.remove(taskId); + } + } + } + + /** + * startCheckProgress + * + * @param context context + */ + public void startCheckProgress(MigrationTaskContext context) { + Integer id = context.getId(); + if (MONITOR_TASK.containsKey(id)) { + return; + } + context.setEncryptionUtils(encryptionUtils); + context.setMigrationTaskCheckProgressDetailService(migrationTaskCheckProgressDetailService); + context.setMigrationTaskCheckProgressSummaryService(migrationTaskCheckProgressSummaryService); + MigrationTaskCheckProgressMonitor taskMonitor = new MigrationTaskCheckProgressMonitor(context); + MONITOR_TASK.put(id, taskMonitor); + MONITOR_TASK_FUTURE.put(id, threadPoolTaskExecutor.submit(taskMonitor)); + } + + /** + * query check progress summary + * + * @param id task id + * @return summary + */ + public MigrationTaskCheckProgressSummary queryFullCheckSummaryOfMigrationTask(Integer id) { + return migrationTaskCheckProgressSummaryService.queryFullCheckSummaryOfMigrationTask(id); + } + + /** + * page check progress detail + * + * @param id task id + * @param status status + * @param pageSize page size + * @param pageNum page num + * @return page + */ + public IPage pageFullCheckDetailOfMigrationTask(Integer id, String status, + int pageSize, int pageNum) { + int pageSizeVal = Optional.of(pageSize).filter(size -> size >= DEFAULT_PAGE_SIZE).orElse(DEFAULT_PAGE_SIZE); + int pageNumVal = Optional.of(pageNum).filter(num -> num >= DEFAULT_PAGE_NUM).orElse(DEFAULT_PAGE_NUM); + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery( + MigrationTaskCheckProgressDetail.class); + queryWrapper.eq(MigrationTaskCheckProgressDetail::getTaskId, id); + queryWrapper.eq(StrUtil.isNotEmpty(status), MigrationTaskCheckProgressDetail::getStatus, status); + queryWrapper.orderByDesc(MigrationTaskCheckProgressDetail::getCreateTime); + Page page = new Page<>(pageNumVal, pageSizeVal); + page.setCurrent(pageNumVal); + return migrationTaskCheckProgressDetailService.page(page, queryWrapper); + } + + /** + * stop migration task check monitor + * + * @param ids task ids + */ + public void stopByTaskIds(List ids) { + ids.forEach(id -> { + if (MONITOR_TASK.containsKey(id)) { + MigrationTaskCheckProgressMonitor monitor = MONITOR_TASK.get(id); + monitor.stop(); + if (MONITOR_TASK_FUTURE.containsKey(id)) { + Future future = MONITOR_TASK_FUTURE.get(id); + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + log.warn("stop monitor task has interrupted or execution id: {} {}", id, e.getMessage()); + } + MONITOR_TASK_FUTURE.remove(id); + } + MONITOR_TASK.remove(id); + } + }); + } + + /** + * delete migration task check summary and details + * + * @param ids ids + */ + public void deleteByTaskIds(List ids) { + if (CollUtil.isNotEmpty(ids)) { + stopByTaskIds(ids); + migrationTaskCheckProgressSummaryService.removeByTaskIds(ids); + migrationTaskCheckProgressDetailService.removeByTaskIds(ids); + } + } +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressSummaryServiceImpl.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressSummaryServiceImpl.java new file mode 100644 index 000000000..4ddf399df --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressSummaryServiceImpl.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * MigrationTaskCheckProgressSummaryServiceImpl.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl + * /MigrationTaskCheckProgressSummaryServiceImpl.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; + +import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressSummary; +import org.opengauss.admin.plugin.mapper.MigrationTaskCheckProgressSummaryMapper; +import org.opengauss.admin.plugin.service.MigrationTaskCheckProgressSummaryService; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * MigrationTaskCheckProgressSummaryService + * + * @author: wangchao + * @Date: 2024/12/30 09:53 + * @Description: MigrationTaskCheckProgressService + * @since 7.0.0 + **/ +@Service +public class MigrationTaskCheckProgressSummaryServiceImpl + extends ServiceImpl + implements MigrationTaskCheckProgressSummaryService { + @Override + public MigrationTaskCheckProgressSummary queryFullCheckSummaryOfMigrationTask(Integer taskId) { + LambdaQueryWrapper lambdaQuery = Wrappers.lambdaQuery( + MigrationTaskCheckProgressSummary.class); + lambdaQuery.eq(MigrationTaskCheckProgressSummary::getTaskId, taskId); + return getOne(lambdaQuery); + } + + @Override + public void removeByTaskIds(List ids) { + LambdaQueryWrapper query = Wrappers.lambdaQuery( + MigrationTaskCheckProgressSummary.class); + query.in(MigrationTaskCheckProgressSummary::getTaskId, ids); + remove(query); + } +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskServiceImpl.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskServiceImpl.java index 5ea28fbdf..1c0d2b8d3 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskServiceImpl.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskServiceImpl.java @@ -13,6 +13,7 @@ package org.opengauss.admin.plugin.service.impl; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.map.MapUtil; import com.alibaba.fastjson.JSON; @@ -27,8 +28,10 @@ import org.apache.commons.lang3.StringUtils; import org.opengauss.admin.common.core.domain.model.ops.JschResult; import org.opengauss.admin.common.core.domain.model.ops.OpsClusterNodeVO; import org.opengauss.admin.common.core.domain.model.ops.OpsClusterVO; -import org.opengauss.admin.common.utils.SecurityUtils; +import org.opengauss.admin.common.exception.ops.OpsException; +import org.opengauss.admin.common.utils.OpsAssert; import org.opengauss.admin.plugin.constants.TaskAlertConstants; +import org.opengauss.admin.plugin.context.MigrationTaskContext; import org.opengauss.admin.plugin.domain.*; import org.opengauss.admin.plugin.enums.FullMigrationDbObjEnum; import org.opengauss.admin.plugin.enums.MainTaskStatus; @@ -37,9 +40,14 @@ import org.opengauss.admin.plugin.enums.ProcessType; import org.opengauss.admin.plugin.enums.TaskOperate; import org.opengauss.admin.plugin.enums.TaskStatus; import org.opengauss.admin.plugin.enums.ToolsConfigEnum; +import org.opengauss.admin.plugin.handler.MigrationRecoveryHandler; import org.opengauss.admin.plugin.handler.PortalHandle; import org.opengauss.admin.plugin.mapper.MigrationTaskMapper; import org.opengauss.admin.plugin.service.*; +import org.opengauss.admin.plugin.utils.ShellUtil; +import org.opengauss.admin.plugin.vo.FullCheckParam; +import org.opengauss.admin.plugin.vo.ProcessStatus; +import org.opengauss.admin.plugin.vo.TaskProcessStatus; import org.opengauss.admin.system.plugin.facade.HostUserFacade; import org.opengauss.admin.system.plugin.facade.OpsFacade; import org.opengauss.admin.system.service.ops.impl.EncryptionUtils; @@ -51,11 +59,16 @@ import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.opengauss.admin.plugin.constants.ToolsParamsLog.NEW_DESC_PREFIX; import static org.opengauss.admin.plugin.constants.ToolsParamsLog.NEW_PARAM_PREFIX; +import javax.annotation.Resource; + /** * @author xielibo * @date 2023/01/14 09:01 @@ -74,6 +87,12 @@ public class MigrationTaskServiceImpl extends ServiceImpl INCREMENTAL_STATUS = Arrays.asList(TaskStatus.INCREMENTAL_START.getCode(), + TaskStatus.INCREMENTAL_RUNNING.getCode(), TaskStatus.INCREMENTAL_PAUSE.getCode(), + TaskStatus.INCREMENTAL_STOPPED.getCode()); + + private static final List REVERSE_STATUS = Arrays.asList(TaskStatus.REVERSE_START.getCode(), + TaskStatus.REVERSE_RUNNING.getCode(), TaskStatus.REVERSE_PAUSE.getCode(), TaskStatus.REVERSE_STOP.getCode()); @Autowired @AutowiredType(AutowiredType.Type.PLUGIN_MAIN) @@ -124,6 +143,51 @@ public class MigrationTaskServiceImpl extends ServiceImpl installMap = new ConcurrentHashMap<>(); + + @Override + public void initMigrationTaskCheckProgressMonitor() { + // 启动加载历史任务 + migrationTaskCheckProgressMonitor(list()); + // 定时任务监控启动后未完成任务状态 + scheduledExecutorService.scheduleAtFixedRate(() -> { + // 迁移任务阶段在 FULL_CHECK_START 和 FULL_CHECK_FINISH 之间的任务,需要检查进度 + List checkTaskList = list(); + for (MigrationTask migrationTask : checkTaskList) { + if (migrationTask.getExecStatus() > TaskStatus.FULL_START.getCode()) { + migrationTaskCheckProgressService.refreshCheckProgress(migrationTask.getId(), + migrationTask.getExecStatus()); + } + } + }, 5, 1, TimeUnit.SECONDS); + } + + private void migrationTaskCheckProgressMonitor(List list) { + if (CollUtil.isNotEmpty(list)) { + list.forEach(checking -> { + if (checking.getExecStatus() >= TaskStatus.FULL_START.getCode()) { + MigrationTaskContext context = new MigrationTaskContext(); + context.setId(checking.getId()); + if (installMap.containsKey(checking.getId())) { + context.setInstallHost(installMap.get(checking.getId())); + } else { + MigrationHostPortalInstall portalInstall = migrationHostPortalInstallHostService.getOneByHostId( + checking.getRunHostId()); + installMap.put(checking.getId(), portalInstall); + context.setInstallHost(portalInstall); + } + context.setMigrationTask(checking); + migrationTaskCheckProgressService.startCheckProgress(context); + } + }); + } + } /** * Query the sub task page list by mainTaskId @@ -149,6 +213,12 @@ public class MigrationTaskServiceImpl extends ServiceImpl { + if (installMap.containsKey(id)) { + installMap.remove(id); + } + }); } @Override @@ -161,29 +231,26 @@ public class MigrationTaskServiceImpl extends ServiceImpl getTaskDetailById(Integer taskId) { Map result = new HashMap<>(); - MigrationTask task = getTaskInfo(taskId); result.put("task", task); setProcessExecDetails(task, result); - MigrationTaskExecResultDetail fullProcess = null; Object fullProcessObj = result.get("fullProcess"); if (fullProcessObj instanceof MigrationTaskExecResultDetail) { fullProcess = (MigrationTaskExecResultDetail) fullProcessObj; } - if (fullProcess != null && StringUtils.isNotBlank(fullProcess.getExecResultDetail())) { Map processMap = JSON.parseObject(fullProcess.getExecResultDetail()); - result.put("tableCounts", new FullMigrationSubProcessCounter( - processMap, FullMigrationDbObjEnum.TABLE.getObjectType())); - result.put("viewCounts", new FullMigrationSubProcessCounter( - processMap, FullMigrationDbObjEnum.VIEW.getObjectType())); - result.put("funcCounts", new FullMigrationSubProcessCounter( - processMap, FullMigrationDbObjEnum.FUNCTION.getObjectType())); - result.put("triggerCounts", new FullMigrationSubProcessCounter( - processMap, FullMigrationDbObjEnum.TRIGGER.getObjectType())); - result.put("produceCounts", new FullMigrationSubProcessCounter( - processMap, FullMigrationDbObjEnum.PROCEDURE.getObjectType())); + result.put("tableCounts", + new FullMigrationSubProcessCounter(processMap, FullMigrationDbObjEnum.TABLE.getObjectType())); + result.put("viewCounts", + new FullMigrationSubProcessCounter(processMap, FullMigrationDbObjEnum.VIEW.getObjectType())); + result.put("funcCounts", + new FullMigrationSubProcessCounter(processMap, FullMigrationDbObjEnum.FUNCTION.getObjectType())); + result.put("triggerCounts", + new FullMigrationSubProcessCounter(processMap, FullMigrationDbObjEnum.TRIGGER.getObjectType())); + result.put("produceCounts", + new FullMigrationSubProcessCounter(processMap, FullMigrationDbObjEnum.PROCEDURE.getObjectType())); FullMigrationProcessCounter processCounter = new FullMigrationProcessCounter(processMap); result.put("totalWaitCount", processCounter.getTotalWaitCount()); result.put("totalRunningCount", processCounter.getTotalRunningCount()); @@ -192,8 +259,11 @@ public class MigrationTaskServiceImpl extends ServiceImpl logPaths = new ArrayList<>(); if (!task.getExecStatus().equals(TaskStatus.NOT_RUN.getCode())) { - MigrationHostPortalInstall installHost = migrationHostPortalInstallHostService.getOneByHostId(task.getRunHostId()); - logPaths = PortalHandle.getPortalLogPath(installHost.getHost(), installHost.getPort(), installHost.getRunUser(), encryptionUtils.decrypt(installHost.getRunPassword()), installHost.getInstallPath(), task); + MigrationHostPortalInstall installHost = migrationHostPortalInstallHostService.getOneByHostId( + task.getRunHostId()); + logPaths = PortalHandle.getPortalLogPath(installHost.getHost(), installHost.getPort(), + installHost.getRunUser(), encryptionUtils.decrypt(installHost.getRunPassword()), + installHost.getInstallPath(), task); } result.put("logs", logPaths); return result; @@ -213,9 +283,8 @@ public class MigrationTaskServiceImpl extends ServiceImpl getResult = getSingleTaskStatusAndProcessByProtal(task); fullProcess = generateProcessDetail(getResult, "fullProcess"); if (task.getMigrationModelId().equals(MigrationMode.ONLINE.getCode())) { @@ -224,50 +293,43 @@ public class MigrationTaskServiceImpl extends ServiceImpl migrationTaskStatusRecords = - migrationTaskStatusRecordService.selectByTaskId(task.getId()); - Map> recordMap = migrationTaskStatusRecords - .stream().collect(Collectors.groupingBy(MigrationTaskStatusRecord::getOperateType)); + List migrationTaskStatusRecords + = migrationTaskStatusRecordService.selectByTaskId(task.getId()); + // migrationTaskStatusRecords 存在可能为空情况,导致接口查询失败。添加过滤条件,过滤掉operateType为空的记录 + Map> recordMap = migrationTaskStatusRecords.stream() + .filter(record -> Objects.nonNull(record.getOperateType())) + .collect(Collectors.groupingBy(MigrationTaskStatusRecord::getOperateType)); result.put("statusRecords", recordMap); } } private MigrationTaskExecResultDetail generateProcessDetail(Map detailsMap, String processKey) { - return MigrationTaskExecResultDetail - .builder() - .execResultDetail(MapUtil.getStr(detailsMap, processKey)) - .build(); + return MigrationTaskExecResultDetail.builder().execResultDetail(MapUtil.getStr(detailsMap, processKey)).build(); } @Override @@ -275,56 +337,81 @@ public class MigrationTaskServiceImpl extends ServiceImpl result = new HashMap<>(); MigrationHostPortalInstall installHost = migrationHostPortalInstallHostService.getOneByHostId(t.getRunHostId()); String password = encryptionUtils.decrypt(installHost.getRunPassword()); - String portalStatus = PortalHandle.getPortalStatus(installHost.getHost(), installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); + String portalStatus = PortalHandle.getPortalStatus(installHost.getHost(), installHost.getPort(), + installHost.getRunUser(), password, installHost.getInstallPath(), t); log.debug("get portal stauts content: {}, subTaskId: {}", portalStatus, t.getId()); if (org.opengauss.admin.common.utils.StringUtils.isNotEmpty(portalStatus)) { List> statusList = (List>) JSON.parse(portalStatus); - List> statusResultList = statusList.stream().sorted(Comparator.comparing(m -> MapUtil.getLong(m, "timestamp"))).collect(Collectors.toList()); + List> statusResultList = statusList.stream() + .sorted(Comparator.comparing(m -> MapUtil.getLong(m, "timestamp"))) + .collect(Collectors.toList()); Map lastStatus = statusResultList.get(statusResultList.size() - 1); Integer state = MapUtil.getInt(lastStatus, "status"); MigrationTask update = MigrationTask.builder().id(t.getId()).build(); migrationTaskStatusRecordService.saveTaskRecord(t.getId(), statusResultList); BigDecimal migrationProcess = new BigDecimal(0); - String portalFullProcess = PortalHandle.getPortalFullProcess(installHost.getHost(), installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); + String portalFullProcess = PortalHandle.getPortalFullProcess(installHost.getHost(), installHost.getPort(), + installHost.getRunUser(), password, installHost.getInstallPath(), t); log.debug("get portal full process content: {}, subTaskId: {}", portalFullProcess, t.getId()); if (StringUtils.isNotBlank(portalFullProcess)) { - migrationTaskExecResultDetailService.saveOrUpdateByTaskId(t.getId(), portalFullProcess.trim(), ProcessType.FULL.getCode()); + migrationTaskExecResultDetailService.saveOrUpdateByTaskId(t.getId(), portalFullProcess.trim(), + ProcessType.FULL.getCode()); migrationProcess = calculateFullMigrationProgress(portalFullProcess); } result.put("fullProcess", portalFullProcess); - String portalDataCheckProcess = PortalHandle.getPortalDataCheckProcess(installHost.getHost(), installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); + String portalDataCheckProcess = PortalHandle.getPortalDataCheckProcess(installHost.getHost(), + installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); log.debug("get portal data check process content: {}, subTaskId: {}", portalDataCheckProcess, t.getId()); if (state >= TaskStatus.FULL_FINISH.getCode()) { migrationProcess = new BigDecimal(1); } update.setMigrationProcess(migrationProcess.setScale(2, RoundingMode.UP).toPlainString()); if (StringUtils.isNotBlank(portalDataCheckProcess)) { - migrationTaskExecResultDetailService.saveOrUpdateByTaskId(t.getId(), portalDataCheckProcess.trim(), ProcessType.DATA_CHECK.getCode()); + migrationTaskExecResultDetailService.saveOrUpdateByTaskId(t.getId(), portalDataCheckProcess.trim(), + ProcessType.DATA_CHECK.getCode()); result.put("dataCheckProcess", portalDataCheckProcess); } - if (TaskStatus.INCREMENTAL_START.getCode().equals(state) || TaskStatus.INCREMENTAL_RUNNING.getCode().equals(state)) { - String portalIncrementalProcess = PortalHandle.getPortalIncrementalProcess(installHost.getHost(), installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); - log.debug("get portal incremental process content: {}, subTaskId: {}", portalIncrementalProcess, t.getId()); + if (TaskStatus.INCREMENTAL_START.getCode().equals(state) || TaskStatus.INCREMENTAL_RUNNING.getCode() + .equals(state) || TaskStatus.INCREMENTAL_PAUSE.getCode().equals(state)) { + String portalIncrementalProcess = PortalHandle.getPortalIncrementalProcess(installHost.getHost(), + installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); + log.debug("get portal incremental process content: {}, subTaskId: {}", portalIncrementalProcess, + t.getId()); if (StringUtils.isNotBlank(portalIncrementalProcess)) { - migrationTaskExecResultDetailService.saveOrUpdateByTaskId(t.getId(), portalIncrementalProcess.trim(), ProcessType.INCREMENTAL.getCode()); + migrationTaskExecResultDetailService.saveOrUpdateByTaskId(t.getId(), + portalIncrementalProcess.trim(), ProcessType.INCREMENTAL.getCode()); } result.put("incrementalProcess", portalIncrementalProcess); - } else if (TaskStatus.INCREMENTAL_STOPPED.getCode().equals(state) || TaskStatus.INCREMENTAL_FINISHED.getCode().equals(state)) { - String portalIncrementalProcess = PortalHandle.getPortalIncrementalProcess(installHost.getHost(), installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); + } else if (TaskStatus.INCREMENTAL_STOPPED.getCode().equals(state) + || TaskStatus.INCREMENTAL_FINISHED.getCode().equals(state)) { + String portalIncrementalProcess = PortalHandle.getPortalIncrementalProcess(installHost.getHost(), + installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); result.put("incrementalProcess", portalIncrementalProcess); - } else if (TaskStatus.REVERSE_START.getCode().equals(state) || TaskStatus.REVERSE_RUNNING.getCode().equals(state)) { - String portaReverselProcess = PortalHandle.getPortalReverseProcess(installHost.getHost(), installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); + } else if (TaskStatus.REVERSE_START.getCode().equals(state) || TaskStatus.REVERSE_RUNNING.getCode() + .equals(state)) { + String portaReverselProcess = PortalHandle.getPortalReverseProcess(installHost.getHost(), + installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); log.debug("get portal reverse process content: {}, subTaskId: {}", portaReverselProcess, t.getId()); if (StringUtils.isNotBlank(portaReverselProcess)) { - migrationTaskExecResultDetailService.saveOrUpdateByTaskId(t.getId(), portaReverselProcess.trim(), ProcessType.REVERSE.getCode()); + migrationTaskExecResultDetailService.saveOrUpdateByTaskId(t.getId(), portaReverselProcess.trim(), + ProcessType.REVERSE.getCode()); } result.put("reverseProcess", portaReverselProcess); - String portalIncrementalProcess = PortalHandle.getPortalIncrementalProcess(installHost.getHost(), installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); + String portalIncrementalProcess = PortalHandle.getPortalIncrementalProcess(installHost.getHost(), + installHost.getPort(), installHost.getRunUser(), password, installHost.getInstallPath(), t); result.put("incrementalProcess", portalIncrementalProcess); + } else { + log.debug("task status {} : {}", t.getId(), state); } if (state > t.getExecStatus()) { update.setExecStatus(state); + } else if (Objects.equals(t.getExecStatus(), TaskStatus.INCREMENTAL_PAUSE.getCode()) || Objects.equals( + t.getExecStatus(), TaskStatus.REVERSE_PAUSE.getCode())) { + update.setExecStatus(state); + } else { + log.debug("task status {} : {} no update", t.getId(), state); } + if (TaskStatus.FULL_CHECK_FINISH.getCode().equals(state) && t.getMigrationModelId().equals(1)) { update.setExecStatus(TaskStatus.MIGRATION_FINISH.getCode()); update.setFinishTime(new Date()); @@ -404,13 +491,7 @@ public class MigrationTaskServiceImpl extends ServiceImpl query = new LambdaQueryWrapper<>(); query.eq(MigrationTask::getRunHostId, hostId); - query.in(MigrationTask::getExecStatus, TaskStatus.FULL_START.getCode(), - TaskStatus.FULL_RUNNING.getCode(), TaskStatus.FULL_FINISH.getCode(), - TaskStatus.FULL_CHECK_START.getCode(), TaskStatus.FULL_CHECKING.getCode(), - TaskStatus.FULL_CHECK_FINISH.getCode(), TaskStatus.INCREMENTAL_START.getCode(), - TaskStatus.INCREMENTAL_RUNNING.getCode(), TaskStatus.REVERSE_START.getCode(), - TaskStatus.REVERSE_RUNNING.getCode() - ); + query.notIn(MigrationTask::getExecStatus, TaskStatus.NOT_RUN.getCode(), TaskStatus.MIGRATION_FINISH.getCode()); return Math.toIntExact(count(query)); } @@ -418,13 +499,7 @@ public class MigrationTaskServiceImpl extends ServiceImpl listRunningTaskByHostId(String hostId) { LambdaQueryWrapper query = new LambdaQueryWrapper<>(); query.eq(MigrationTask::getRunHostId, hostId); - query.in(MigrationTask::getExecStatus, TaskStatus.FULL_START.getCode(), - TaskStatus.FULL_RUNNING.getCode(), TaskStatus.FULL_FINISH.getCode(), - TaskStatus.FULL_CHECK_START.getCode(), TaskStatus.FULL_CHECKING.getCode(), - TaskStatus.FULL_CHECK_FINISH.getCode(), TaskStatus.INCREMENTAL_START.getCode(), - TaskStatus.INCREMENTAL_RUNNING.getCode(), TaskStatus.REVERSE_START.getCode(), - TaskStatus.REVERSE_RUNNING.getCode() - ); + query.notIn(MigrationTask::getExecStatus, TaskStatus.NOT_RUN.getCode(), TaskStatus.MIGRATION_FINISH.getCode()); return list(query); } @@ -456,31 +531,38 @@ public class MigrationTaskServiceImpl extends ServiceImpl globalParams) { + public void runTask(MigrationTaskHostRef h, MigrationTask t, List globalParams, + String operateUsername) { MigrationHostPortalInstall installHost = migrationHostPortalInstallHostService.getOneByHostId(h.getRunHostId()); installHost.setRunPassword(encryptionUtils.decrypt(installHost.getRunPassword())); t.setRunHostId(h.getRunHostId()); MigrationTask update = MigrationTask.builder() - .id(t.getId()) - .runHostId(h.getRunHostId()) - .runHost(h.getHost()) - .runHostname(h.getHostName()) - .runPort(h.getPort()) - .runUser(h.getUser()) - .runPass(h.getPassword()) - .build(); + .id(t.getId()) + .runHostId(h.getRunHostId()) + .runHost(h.getHost()) + .runHostname(h.getHostName()) + .runPort(h.getPort()) + .runUser(h.getUser()) + .runPass(h.getPassword()) + .build(); updateById(update); if (!execMigrationCheck(installHost, t, globalParams, "verify_pre_migration")) { return; } PortalHandle.startPortal(installHost, t, installHost.getJarName(), getTaskParam(installHost, globalParams, t)); update = MigrationTask.builder() - .id(t.getId()) - .execStatus(TaskStatus.FULL_START.getCode()) - .execTime(new Date()) - .build(); - migrationTaskOperateRecordService.saveRecord(t.getId(), TaskOperate.RUN, SecurityUtils.getUsername()); + .id(t.getId()) + .execStatus(TaskStatus.FULL_START.getCode()) + .execTime(new Date()) + .build(); + migrationTaskOperateRecordService.saveRecord(t.getId(), TaskOperate.RUN, operateUsername); updateById(update); + MigrationTaskContext context = new MigrationTaskContext(); + context.setId(t.getId()); + context.setMigrationTask(t); + context.setInstallHost(installHost); + installMap.put(t.getId(), installHost); + migrationTaskCheckProgressService.startCheckProgress(context); } /** @@ -516,6 +598,120 @@ public class MigrationTaskServiceImpl extends ServiceImpl processMap = migrationRecoveryHandler.fetchProcessStatusListByMigrationTask( + migrationTask, installPath); + if (!migrationRecoveryHandler.hasPortal(processMap)) { + taskProcessStatus.setType("error"); + String version = migrationRecoveryHandler.getJarVersion(installHost.getJarName()); + if (migrationRecoveryHandler.checkPortalVersion(version)) { + taskProcessStatus.setMessage("The current portal version" + version + + " does not support migration recovery. task has been error"); + } else { + taskProcessStatus.setMessage("The current portal process does not exist. task has been error"); + } + return taskProcessStatus; + } + if (INCREMENTAL_STATUS.contains(lastTaskStatus.getStatusId())) { + taskProcessStatus.setType("online"); + taskProcessStatus.setSource(migrationRecoveryHandler.hasIncrementalSource(processMap)); + taskProcessStatus.setSink(migrationRecoveryHandler.hasIncrementalSink(processMap)); + } else if (REVERSE_STATUS.contains(lastTaskStatus.getStatusId())) { + taskProcessStatus.setType("reverse"); + taskProcessStatus.setSource(migrationRecoveryHandler.hasReverseSource(processMap)); + taskProcessStatus.setSink(migrationRecoveryHandler.hasReverseSink(processMap)); + } else { + log.debug("The current portal process does not exist."); + } + } + return taskProcessStatus; + } + + @Override + public TaskProcessStatus startTaskOfIncrementalOrReverseMigrationProcess(Integer id, String name) { + MigrationTask migrationTask = getById(id); + OpsAssert.nonNull(migrationTask, "migration task not exist."); + MigrationTaskStatusRecord lastTaskStatus = migrationTaskStatusRecordService.getLastByTaskId(id); + OpsAssert.nonNull(lastTaskStatus, "migration task status not exist."); + MigrationHostPortalInstall installHost = migrationHostPortalInstallHostService.getOneByHostId( + migrationTask.getRunHostId()); + String installPath = installHost.getInstallRootPath(); + Map processMap = migrationRecoveryHandler.fetchProcessStatusListByMigrationTask( + migrationTask, installPath); + if (!migrationRecoveryHandler.hasPortal(processMap)) { + String version = migrationRecoveryHandler.getJarVersion(installHost.getJarName()); + if (migrationRecoveryHandler.checkPortalVersion(version)) { + throw new OpsException("The current portal version" + version + + " does not support migration recovery. task has been error"); + } else { + throw new OpsException("The current portal process does not exist. task has been error"); + } + } + // 未流转到增量迁移阶段,或者迁移任务已结束,则直接返回 + if (INCREMENTAL_STATUS.contains(lastTaskStatus.getStatusId())) { + migrationRecoveryHandler.startProcessOfIncrementalMigration(migrationTask, installHost, name); + updateStatus(migrationTask.getId(), TaskStatus.INCREMENTAL_RUNNING); + } else if (REVERSE_STATUS.contains(lastTaskStatus.getStatusId())) { + migrationRecoveryHandler.startProcessOfReverseMigration(migrationTask, installHost, name); + updateStatus(migrationTask.getId(), TaskStatus.REVERSE_RUNNING); + } else { + log.debug("The current portal process does not exist."); + } + return checkStatusOfIncrementalOrReverseMigrationTask(id); + } + + @Override + public MigrationTaskCheckProgressSummary queryFullCheckSummaryOfMigrationTask(Integer id) { + return migrationTaskCheckProgressService.queryFullCheckSummaryOfMigrationTask(id); + } + + @Override + public IPage queryFullCheckDetailOfMigrationTask(FullCheckParam fullCheckParam) { + Integer id = fullCheckParam.getId(); + OpsAssert.nonNull(id, "migration task id can't be null"); + MigrationTask migrationTask = getById(id); + OpsAssert.nonNull(migrationTask, "migration task not exist"); + String status = fullCheckParam.getStatus(); + int pageNum = fullCheckParam.getPageNum(); + int pageSize = fullCheckParam.getPageSize(); + return migrationTaskCheckProgressService.pageFullCheckDetailOfMigrationTask(id, status, pageSize, pageNum); + } + + @Override + public String downloadRepairFile(Integer id, String repairFileName) { + MigrationTask task = getById(id); + MigrationHostPortalInstall installHost = migrationHostPortalInstallHostService.getOneByHostId( + task.getRunHostId()); + String catFailedRepairFile = "cat " + installHost.getInstallPath() + "portal/workspace/" + id + + "/checkResult/result/" + repairFileName; + JschResult result = ShellUtil.execCommandGetResult(task.getRunHost(), task.getRunPort(), task.getRunUser(), + encryptionUtils.decrypt(task.getRunPass()), catFailedRepairFile); + return result.isOk() ? result.getResult() : ""; + } + /** * special character handling * @@ -759,13 +955,17 @@ public class MigrationTaskServiceImpl extends ServiceImpl 0) { log.info("waiting for the resource to execute,task count : {}", waitRunCount); List migrationTasks = listTaskByStatus(TaskStatus.WAIT_RESOURCE); - migrationTasks.stream().forEach((t -> { + migrationTasks.forEach((t -> { List hosts = migrationTaskHostRefService.listByMainTaskId(t.getMainTaskId()); - MigrationTaskHostRef selectHost = hosts.stream().filter(h -> h.getRunnableCount() > 0).findFirst().orElse(null); + MigrationTaskHostRef selectHost = hosts.stream() + .filter(h -> h.getRunnableCount() > 0) + .findFirst() + .orElse(null); if (selectHost != null) { - log.info("Offline scheduling successfully assigns tasks to host for execution. taskId : {}, HostId : {}, HostIP : {}", - t.getId(), selectHost.getRunHostId(), selectHost.getHost()); - runTask(selectHost, t, null); + log.info( + "Offline scheduling successfully assigns tasks to host for execution. taskId : {}, HostId" + + " : {}, HostIP : {}", t.getId(), selectHost.getRunHostId(), selectHost.getHost()); + runTask(selectHost, t, null, null); } })); } @@ -776,5 +976,4 @@ public class MigrationTaskServiceImpl extends ServiceImpl implements MigrationTaskStatusRecordService { - - +public class MigrationTaskStatusRecordServiceImpl + extends ServiceImpl + implements MigrationTaskStatusRecordService { @Autowired private MigrationTaskOperateRecordService migrationTaskOperateRecordService; - @Autowired private MigrationTaskStatusRecordMapper migrationTaskStatusRecordMapper; - @Override @Transactional(rollbackFor = Exception.class) public void saveTaskRecord(Integer taskId, List> statusRecord) { LambdaQueryWrapper query = new LambdaQueryWrapper<>(); query.eq(MigrationTaskStatusRecord::getTaskId, taskId); this.remove(query); - Map>> status = statusRecord.stream().collect(Collectors.groupingBy(r -> MapUtil.getInt(r, "status"))); - List recordList = new ArrayList<>(); - status.entrySet().stream().forEach(m -> { - Integer operateType = TaskConstant.TASK_STATUS_OPERATE_MAPPING.get(m.getKey()); - if (operateType != null) { - MigrationTaskStatusRecord record = new MigrationTaskStatusRecord(); - MigrationTaskOperateRecord lastOperateRecord = migrationTaskOperateRecordService.getRecordByTaskIdAndOperType(taskId, operateType); - if (lastOperateRecord != null) { - record.setOperateId(lastOperateRecord.getId()); - } - record.setStatusId(m.getKey()); - record.setTaskId(taskId); - if (m.getValue().size() > 0) { - Long timestamp = MapUtil.getLong(m.getValue().get(0), "timestamp"); - record.setCreateTime(DateUtil.date(timestamp)); - } - MigrationTaskStatusRecord sameStatusRecord = CollUtil.findOne(recordList, item -> item.getStatusId().equals(record.getStatusId())); - if (sameStatusRecord == null) { - recordList.add(record); - } + LinkedList recordList = new LinkedList<>(); + for (Map statusMap : statusRecord) { + Integer status = MapUtil.getInt(statusMap, "status"); + if (isEqualLastStatus(recordList, status) || Objects.equals(status, TaskStatus.MIGRATION_ERROR.getCode())) { + continue; } - }); - if (recordList.size() > 0) { + deleteOlderRecordOfRepeatStatus(recordList, status); + if (isEqualLastStatus(recordList, TaskStatus.INCREMENTAL_PAUSE.getCode()) || isEqualLastStatus(recordList, + TaskStatus.REVERSE_PAUSE.getCode())) { + recordList.removeLast(); + continue; + } + addMigrationTaskStatusRecord(recordList, taskId, status, statusMap); + } + if (!recordList.isEmpty()) { this.saveBatch(recordList); } } + private void deleteOlderRecordOfRepeatStatus(LinkedList recordList, Integer status) { + if (recordList.isEmpty()) { + return; + } + List delRes = recordList.stream() + .filter(record -> Objects.equals(record.getStatusId(), status)) + .collect(Collectors.toList()); + if (!delRes.isEmpty()) { + recordList.removeAll(delRes); + } + } + + private void addMigrationTaskStatusRecord(LinkedList recordList, Integer taskId, + Integer status, Map statusMap) { + Integer operateType = TaskConstant.TASK_STATUS_OPERATE_MAPPING.get(status); + if (!TaskConstant.TASK_STATUS_OPERATE_MAPPING.containsKey(status)) { + log.warn("status record of operate type is invalid,taskId={} status={}", taskId, status); + return; + } + MigrationTaskStatusRecord record = new MigrationTaskStatusRecord(); + MigrationTaskOperateRecord lastRecord = migrationTaskOperateRecordService.getRecordByTaskIdAndOperType(taskId, + operateType); + if (lastRecord != null) { + record.setOperateId(lastRecord.getId()); + } + record.setStatusId(status); + record.setTaskId(taskId); + Long timestamp = MapUtil.getLong(statusMap, "timestamp"); + record.setCreateTime(DateUtil.date(timestamp)); + recordList.addLast(record); + } + + private boolean isEqualLastStatus(LinkedList recordList, Integer status) { + return !recordList.isEmpty() && Objects.equals(recordList.getLast().getStatusId(), status); + } + @Override public void saveRecord(Integer taskId, Integer statusId, String title, Date time) { MigrationTaskStatusRecord lastRecord = getLastByTaskId(taskId); diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/FullCheckParam.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/FullCheckParam.java new file mode 100644 index 000000000..baaa1395d --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/FullCheckParam.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * FullCheckParam.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/FullCheckParam.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.vo; + +import lombok.Data; + +/** + * FullCheckParam + * + * @author: wangchao + * @Date: 2024/12/30 11:55 + * @Description: FullCheckDto + * @since 7.0.0 + **/ +@Data +public class FullCheckParam { + Integer id; + String status; + int pageSize; + int pageNum; +} diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/ProcessStatus.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/ProcessStatus.java new file mode 100644 index 000000000..f5cfa3891 --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/ProcessStatus.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * ProcessStatus.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/ProcessStatus.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.vo; + +import lombok.Data; + +/** + * The ps command in Linux is used to provide information about the currently running processes, + * including their process identification numbers (PIDs), the terminal associated with them, + * and the current state. Here are some commonly used options with the ps command: + * + * ps: This command alone lists the processes running for the current shell. + * + * @author: wangchao + * @Date: 2024/12/30 11:55 + * @since 7.0.0 + **/ +@Data +public class ProcessStatus { + private String name; + private String uid; + private String pid; + private String cmd; + + public ProcessStatus(String uid, String pid, String cmd) { + this.uid = uid; + this.pid = pid; + this.cmd = cmd; + } +} \ No newline at end of file diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/TaskProcessStatus.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/TaskProcessStatus.java new file mode 100644 index 000000000..b283ffbb6 --- /dev/null +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/TaskProcessStatus.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + * + * TaskProcessStatus.java + * + * IDENTIFICATION + * plugins/data-migration/src/main/java/org/opengauss/admin/plugin/vo/TaskProcessStatus.java + * + * ------------------------------------------------------------------------- + */ + +package org.opengauss.admin.plugin.vo; + +import lombok.Data; + +/** + * TaskProcessStatus + * + * @author: wangchao + * @Date: 2024/12/27 11:10 + * @Description: TaskProcessStatus + * @since 7.0.0 + **/ +@Data +public class TaskProcessStatus { + private Integer id; + private Integer status; + private String type; + private boolean source; + private boolean sink; + private String message; +} diff --git a/plugins/data-migration/src/main/resources/opengauss-schema.sql b/plugins/data-migration/src/main/resources/opengauss-schema.sql index 9fe1e6792..bddbd2007 100644 --- a/plugins/data-migration/src/main/resources/opengauss-schema.sql +++ b/plugins/data-migration/src/main/resources/opengauss-schema.sql @@ -153,6 +153,18 @@ START 1 CACHE 1; END IF; + +IF NOT EXISTS (SELECT 1 FROM information_schema.sequences WHERE sequence_schema=''public'' AND sequence_name=''sq_tb_migration_task_check_progress_summary_id'' ) +THEN +CREATE SEQUENCE "public"."sq_tb_migration_task_check_progress_summary_id" +INCREMENT 1 +MINVALUE 1 +MAXVALUE 9223372036854775807 +START 1 +CACHE 1; +END IF; + + RETURN 0; END;' LANGUAGE plpgsql; @@ -1070,7 +1082,7 @@ VALUES(25, 'openEuler', '22.03', 'aarch64', 'https://opengauss.obs.cn-south-1.my -- ---------------------------- CREATE TABLE IF NOT EXISTS "public"."tb_migration_task_alert" ( - "id" int8 NOT NULL DEFAULT nextval('sq_tb_migration_tool_portal_download_info_id'::regclass), + "id" int8 NOT NULL DEFAULT nextval('sq_tb_migration_task_alert_id'::regclass), "task_id" int8 NOT NULL, "migration_phase" int8 NOT NULL, "date_time" varchar(24) NOT NULL, @@ -1111,4 +1123,66 @@ CREATE TABLE IF NOT EXISTS "public"."tb_migration_task_alert_detail" ( ); COMMENT ON COLUMN "public"."tb_migration_task_alert_detail"."alert_id" IS '告警信息主键ID'; -COMMENT ON COLUMN "public"."tb_migration_task_alert_detail"."detail" IS '告警对应的日志文本'; \ No newline at end of file +COMMENT ON COLUMN "public"."tb_migration_task_alert_detail"."detail" IS '告警对应的日志文本'; + + +-- ---------------------------- +-- Table structure for tb_migration_task_check_progress_detail +-- ---------------------------- + +CREATE TABLE IF NOT EXISTS "public"."tb_migration_task_check_progress_detail" ( + "id" varchar(255) NOT NULL, + "task_id" int8 NOT NULL, + "schema_name" varchar(255), + "source_name" varchar(255), + "sink_name" varchar(255), + "status" varchar(16), + "failed_rows" int8, + "repair_file_name" varchar(255), + "message" text, + "create_time" timestamp DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "tb_migration_task_check_progress_detail_pkey" PRIMARY KEY ("id") + ); +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."id" IS 'ID'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."task_id" IS '迁移任务ID'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."schema_name" IS '迁移任务源端Schema名称'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."source_name" IS '迁移任务源端表名称'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."sink_name" IS '迁移任务目标端表名称'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."status" IS '迁移任务表校验结果'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."failed_rows" IS '迁移任务表校验失败行数'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."repair_file_name" IS '迁移任务表校验失败修复脚本名称'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."message" IS '迁移任务表校验信息'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_detail"."create_time" IS '迁移任务表记录创建时间'; + + + +-- ---------------------------- +-- Table structure for tb_migration_task_check_progress_summary +-- ---------------------------- + +CREATE TABLE IF NOT EXISTS "public"."tb_migration_task_check_progress_summary" ( + "id" int8 NOT NULL DEFAULT nextval('sq_tb_migration_task_check_progress_summary_id'::regclass), + "task_id" int8 NOT NULL, + "source_db" varchar(255), + "sink_db" varchar(255), + "total" int8, + "avg_speed" int4, + "completed" int4, + "table_count" int4, + "start_time" timestamp, + "end_time" timestamp, + "status" varchar(16), + CONSTRAINT "tb_migration_task_check_progress_summary_pkey" PRIMARY KEY ("id") + ); + +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."id" IS '迁移全量校验进度表详情主键ID'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."task_id" IS '迁移任务ID'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."source_db" IS '迁移任务源端表名称'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."sink_db" IS '迁移任务目标端表名称'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."total" IS '迁移任务校验总记录数'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."avg_speed" IS '迁移任务平均校验速率'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."completed" IS '迁移任务完成表数量'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."table_count" IS '迁移任务校验表数量'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."start_time" IS '迁移任务开始时间'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."end_time" IS '迁移任务截止时间'; +COMMENT ON COLUMN "public"."tb_migration_task_check_progress_summary"."status" IS '迁移任务是否完成'; -- Gitee From 03ac2705fd34e654ee1277577a014d0fb0225de3 Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Tue, 21 Jan 2025 12:53:20 +0800 Subject: [PATCH 2/4] =?UTF-8?q?DataKit=E6=96=AD=E7=82=B9=E7=BB=AD=E4=BC=A0?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b2da9a6ef..f202a53f5 100644 --- a/pom.xml +++ b/pom.xml @@ -225,7 +225,7 @@ plugins/compatibility-assessment plugins/intelligent-parameter-tuning plugins/openGauss-tools-monitor - plugins/oauth-login + plugins/container-management-plugin pom -- Gitee From 066697f63866535571404f6869c6db6bb3df20ac Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Tue, 21 Jan 2025 13:09:44 +0800 Subject: [PATCH 3/4] =?UTF-8?q?DataKit=E6=96=AD=E7=82=B9=E7=BB=AD=E4=BC=A0?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../opengauss/admin/system/service/HostMonitorCacheService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java b/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java index cb0974449..f8ba0f680 100644 --- a/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java +++ b/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java @@ -79,7 +79,6 @@ public class HostMonitorCacheService { /** * init host cache scheduled */ - @Async public void initHostMonitorCacheService() { lastedFetch.set(System.currentTimeMillis() / 1000); List list = hostService.list(); -- Gitee From b2e7fc878f91689c685760ab307cab443726c8d3 Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Tue, 21 Jan 2025 14:34:53 +0800 Subject: [PATCH 4/4] =?UTF-8?q?DataKit=E6=96=AD=E7=82=B9=E7=BB=AD=E4=BC=A0?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../admin/system/service/HostMonitorCacheService.java | 1 - .../plugin/domain/MigrationTaskCheckProgressDetail.java | 4 ++-- .../plugin/domain/MigrationTaskCheckProgressSummary.java | 6 +++--- .../admin/plugin/handler/MigrationRecoveryHandler.java | 1 + .../mapper/MigrationTaskCheckProgressDetailMapper.java | 5 +++-- .../mapper/MigrationTaskCheckProgressSummaryMapper.java | 5 +++-- .../service/MigrationTaskCheckProgressDetailService.java | 2 +- .../service/MigrationTaskCheckProgressSummaryService.java | 4 ++-- .../service/impl/MigrationTaskCheckProgressService.java | 7 ++----- .../impl/MigrationTaskCheckProgressSummaryServiceImpl.java | 3 +-- 10 files changed, 18 insertions(+), 20 deletions(-) diff --git a/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java b/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java index f8ba0f680..55810dd86 100644 --- a/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java +++ b/openGauss-datakit/visualtool-service/src/main/java/org/opengauss/admin/system/service/HostMonitorCacheService.java @@ -37,7 +37,6 @@ import org.opengauss.admin.system.plugin.beans.SshLogin; import org.opengauss.admin.system.service.ops.IHostService; import org.opengauss.admin.system.service.ops.IHostUserService; import org.opengauss.admin.system.service.ops.impl.EncryptionUtils; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.Arrays; diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressDetail.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressDetail.java index 6c2914e41..1eb2a2895 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressDetail.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressDetail.java @@ -36,8 +36,8 @@ import java.util.Date; * MigrationTaskCheckProgressDetail * * @author wangchao - * @date 2025/01/14 09:01 - **/ + * @since 2024/10/29 09:26 + */ @Data @Builder @TableName("tb_migration_task_check_progress_detail") diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressSummary.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressSummary.java index a040cba90..8f28583d2 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressSummary.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/domain/MigrationTaskCheckProgressSummary.java @@ -36,11 +36,11 @@ import lombok.experimental.Tolerate; import java.time.LocalDateTime; /** - * MigrationTaskCheckProgressSummary + * MigrationTaskCheckProgressDetail * * @author wangchao - * @date 2025/01/14 09:01 - **/ + * @since 2025/01/14 + */ @Data @Builder @TableName("tb_migration_task_check_progress_summary") diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/handler/MigrationRecoveryHandler.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/handler/MigrationRecoveryHandler.java index 450d8995f..d9514c749 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/handler/MigrationRecoveryHandler.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/handler/MigrationRecoveryHandler.java @@ -78,6 +78,7 @@ public class MigrationRecoveryHandler { private static final String NAME_MIGRATION_CHECK = "check"; private static final String NAME_MIGRATION_CHECK_SOURCE = "check-source"; private static final String NAME_MIGRATION_CHECK_SINK = "check-sink"; + private static final Map PROCESS_CMD_KEYWORD_FOR_NAME = new HashMap<>(); static { diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressDetailMapper.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressDetailMapper.java index f4e1bd645..5fb5caa78 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressDetailMapper.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressDetailMapper.java @@ -32,7 +32,8 @@ import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressDetail; * MigrationTaskCheckProgressDetailMapper * * @author wangchao - * @date 2025/01/14 09:01 + * @since 2025/1/15 09:26 */ @Mapper -public interface MigrationTaskCheckProgressDetailMapper extends BaseMapper { } \ No newline at end of file +public interface MigrationTaskCheckProgressDetailMapper extends BaseMapper { +} \ No newline at end of file diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressSummaryMapper.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressSummaryMapper.java index 30a6104ae..a1c41532a 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressSummaryMapper.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/mapper/MigrationTaskCheckProgressSummaryMapper.java @@ -32,7 +32,8 @@ import org.opengauss.admin.plugin.domain.MigrationTaskCheckProgressSummary; * MigrationTaskCheckProgressSummaryMapper * * @author wangchao - * @date 2025/01/14 09:01 + * @since 2025/01/14 09:01 */ @Mapper -public interface MigrationTaskCheckProgressSummaryMapper extends BaseMapper { } \ No newline at end of file +public interface MigrationTaskCheckProgressSummaryMapper extends BaseMapper { +} \ No newline at end of file diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressDetailService.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressDetailService.java index 0d4bdf6f7..018e00c5c 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressDetailService.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressDetailService.java @@ -33,7 +33,7 @@ import java.util.List; * MigrationTaskCheckProgressDetailService * * @author wangchao - * @date 2025/01/14 09:01 + * @since 2025/01/14 09:01 */ public interface MigrationTaskCheckProgressDetailService extends IService { /** diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressSummaryService.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressSummaryService.java index 645da3d2f..918364b2f 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressSummaryService.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/MigrationTaskCheckProgressSummaryService.java @@ -33,7 +33,7 @@ import java.util.List; * MigrationTaskCheckProgressSummaryService * * @author wangchao - * @date 2025/01/14 09:01 + * @since 2025/01/14 09:01 */ public interface MigrationTaskCheckProgressSummaryService extends IService { /** @@ -46,7 +46,7 @@ public interface MigrationTaskCheckProgressSummaryService extends IService ids); diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressService.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressService.java index 52f7bd414..de6c9aa77 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressService.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressService.java @@ -21,7 +21,6 @@ * ------------------------------------------------------------------------- */ - package org.opengauss.admin.plugin.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -54,11 +53,9 @@ import javax.annotation.Resource; /** * MigrationTaskCheckProgressService - * @author: wangchao - * @Date: 2024/12/30 09:53 - * @Description: MigrationTaskCheckProgressService * - * @since 7.0.0 + * @author: wangchao + * @since: 2024/12/30 09:53 **/ @Service @Slf4j diff --git a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressSummaryServiceImpl.java b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressSummaryServiceImpl.java index 4ddf399df..342df1132 100644 --- a/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressSummaryServiceImpl.java +++ b/plugins/data-migration/src/main/java/org/opengauss/admin/plugin/service/impl/MigrationTaskCheckProgressSummaryServiceImpl.java @@ -39,9 +39,8 @@ import java.util.List; * MigrationTaskCheckProgressSummaryService * * @author: wangchao - * @Date: 2024/12/30 09:53 * @Description: MigrationTaskCheckProgressService - * @since 7.0.0 + * @since 2025/01/14 **/ @Service public class MigrationTaskCheckProgressSummaryServiceImpl -- Gitee