AND sdt_standard_quatity.standard_quality_quality_center_id = #{standardQuatityFilter.standardQualityQualityCenterId}
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java
index 0d518feb453aad4f7ac8cc84f4af86bcf9ecbc64..a539f06126eff81dea43e9995c30cfaa46258dbd 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java
@@ -98,6 +98,12 @@ public class CustomizeRouteDto {
@ApiModelProperty(value = "业务规程ID")
private Long processId;
+ /**
+ * 指标ID。
+ */
+ @ApiModelProperty(value = "指标ID")
+ private Long definitionIndexId;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java
index 983f91d8e33741193ff0181525614911241518b1..6afce37493e21e0faf701abeaf5e092c92886e9e 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java
@@ -92,10 +92,10 @@ public class DefinitionIndexDto {
private String indexDescription;
/**
- * 关联明细表id。
+ * 动态路由id。
*/
- @ApiModelProperty(value = "关联明细表id")
- private Long modelLogicalId;
+ @ApiModelProperty(value = "动态路由id")
+ private Long customizeRouteId;
/**
* 关联字段。
@@ -115,30 +115,6 @@ public class DefinitionIndexDto {
@ApiModelProperty(value = "生产周期")
private String productPeriod;
- /**
- * 计算函数。
- */
- @ApiModelProperty(value = "计算函数")
- private String caliberCalculateFunction;
-
- /**
- * 度量单位。
- */
- @ApiModelProperty(value = "度量单位")
- private String caliberMeasureUnit;
-
- /**
- * 度量精度。
- */
- @ApiModelProperty(value = "度量精度")
- private String caliberPrecision;
-
- /**
- * 口径说明。
- */
- @ApiModelProperty(value = "口径说明")
- private String caliberDescription;
-
/**
* updateTime 范围过滤起始值(>=)。
*/
@@ -164,7 +140,7 @@ public class DefinitionIndexDto {
private String createTimeEnd;
/**
- * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / caliber_calculate_function / caliber_measure_unit / caliber_precision / caliber_description LIKE搜索字符串。
+ * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / LIKE搜索字符串。
*/
@ApiModelProperty(value = "LIKE模糊搜索字符串")
private String searchString;
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java
index a26b65a713785053dd972b5877478c9ed964ef46..c40975c7a1f18e1479ecc6e43109addc5b8b2689 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java
@@ -173,6 +173,12 @@ public class ModelDesginFieldDto {
@ApiModelProperty(value = "业务过程id")
private Long processId;
+ /**
+ * 数据标准主表ID standard_main_id。
+ */
+ @ApiModelProperty(value = "数据标准主表ID")
+ private Long standardMainId;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java
index 862cc4c7b23aaf8fe20788e3d47d1fe0cdc79fb8..ccace30d0b4688c614fc5e2d40976fa5ac40b272 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java
@@ -97,6 +97,12 @@ public class StandardMainDto {
@ApiModelProperty(value = "标准状态")
private String standardStatus;
+ /**
+ * 正则表达式。
+ */
+ @ApiModelProperty(value = "正则表达式")
+ private String standardRegular;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java
index 65b70ad7926a38155151575ea57146ab5931001e..6e46c3a87afe817716bb6d83816fd6fc9252eebf 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java
@@ -68,16 +68,22 @@ public class StandardQuatityDto {
private String standardQaulityRe;
/**
- * 质量校验长度限制。
+ * 质量校验长度限制(正数->大等于。负数->小等于)。
*/
- @ApiModelProperty(value = "质量校验长度限制")
- private String standardQualityLang;
+ @ApiModelProperty(value = "质量校验长度限制(正数->大等于。负数->小等于)")
+ private Integer standardQualityLang;
/**
- * 质量校验不为空。
+ * 质量校验不为空(1:不为空。0:可为空)。
*/
- @ApiModelProperty(value = "质量校验不为空")
- private String standardQualityNotNull;
+ @ApiModelProperty(value = "质量校验不为空(1:不为空。0:可为空)")
+ private Integer standardQualityNotNull;
+
+ /**
+ * SQL条件。
+ */
+ @ApiModelProperty(value = "SQL条件")
+ private String customJudgment;
/**
* 质量校验中心关联规则。
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java
index 2dc26f8f5590dadeed56aa288e92690d2171d2e2..e02247ec547232be4569d051187ea4bf35f9ad10 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java
@@ -2,6 +2,8 @@ package supie.webadmin.app.liteFlow.model;
import lombok.Data;
+import java.util.Map;
+
/**
* 描述:DataTransfer组件的相关信息
*
@@ -15,7 +17,7 @@ public class DataTransferModel {
/**
* jobId
*/
- private String jobId;
+ private Long jobId;
/**
* jobName
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java
index cc7806aed817f96d2151018b498bcebf4527adac..238176e995af4e27cc92768654c700e62af31c8e 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java
@@ -36,15 +36,15 @@ public class LiteFlowNodeLogModel {
private String logMessage;
public static LiteFlowNodeLogModel info(String nodeId, String nodeTag, String logMessage) {
- return new LiteFlowNodeLogModel("INFO", nodeId, nodeTag, new Date(),logMessage);
+ return new LiteFlowNodeLogModel("INFO", nodeId, nodeTag, new Date(), logMessage);
}
public static LiteFlowNodeLogModel warn(String nodeId, String nodeTag, String logMessage) {
- return new LiteFlowNodeLogModel("WARN", nodeId, nodeTag, new Date(),logMessage);
+ return new LiteFlowNodeLogModel("WARN", nodeId, nodeTag, new Date(), logMessage);
}
public static LiteFlowNodeLogModel error(String nodeId, String nodeTag, String logMessage) {
- return new LiteFlowNodeLogModel("ERROR", nodeId, nodeTag, new Date(),logMessage);
+ return new LiteFlowNodeLogModel("ERROR", nodeId, nodeTag, new Date(), logMessage);
}
public LiteFlowNodeLogModel(String nodeId, String nodeTag, String logMessage) {
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java
index f1b609035f6ee253292caef076a2cd8166787251..7e0640697f3e84ee24112a3a2432cbc330943757 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java
@@ -1,12 +1,16 @@
package supie.webadmin.app.liteFlow.node;
import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.json.JSONUtil;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -46,16 +50,15 @@ public class DataTransferNode extends BaseNode {
* Seatunnel 配置信息
*/
private SeatunnelConfig seatunnelConfigModel;
- /**
- * 临时文件路径
- */
- private String tempFilePath;
+
+ String envJobModelValue = null;
@Autowired
private SeatunnelConfigMapper seatunnelConfigMapper;
@Autowired
private RemoteHostMapper remoteHostMapper;
+
@Override
public void beforeProcess() {
dataTransferModel = JSONUtil.toBean(devLiteflowNode.getFieldJsonData(), DataTransferModel.class);
@@ -71,23 +74,127 @@ public class DataTransferNode extends BaseNode {
@Override
public void process() throws Exception {
+ try {
+ JsonNode jsonNode = new JsonMapper().readTree(dataTransferModel.getSeaTunnelConfig());
+ envJobModelValue = jsonNode.get("env").get("job.mode").asText();
+ } catch (JsonProcessingException e) {
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, e.getMessage()));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage()));
+ }
//判断什么方式调用 Seatunnel
if (this.seatunnelConfigModel.getCallMode() == 1) {
// 通过接口的方式调用 Seatunnel
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用RestApi方式调用Seatunnel。"));
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用RestApi方式调用Seatunnel."));
restApiSubmitJob();
} else if (this.seatunnelConfigModel.getCallMode() == 2) {
+ // 判断是否需要等待 Seatunnel 任务执行完毕
// 通过 SSH 方式调用 Seatunnel
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用SSH方式调用Seatunnel。"));
- sshSubmitJob();
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用SSH方式执行Seatunnel任务."));
+ if ("BATCH".equals(envJobModelValue)) {
+ // BATCH:需要等待执行完再执行下一个节点
+ sshSubmitJob();
+ } else {
+ String logFileName = "RunSeatunnelTask_" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS") + ".log";
+ String message = "env节点下的job.mode节点值为[" + envJobModelValue +
+ "], 使用异步方式执行, Seatunnel任务执行日志[" + logFileName + "]将保存在Seatunnel安装位置下的 taskLog 目录下.";
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, message));
+// String a = "nohup ./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local > ./taskLog/RunSeatunnelTask.log 2>&1 &";
+// String b = "./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local | tee ./taskLog/seatunnel.log && sync";
+ // 开启一个线程
+ // 执行远程命令,并保存日志至
+ new Thread(() -> {
+ sshSubmitJobAndUploadLogFil(logFileName);
+ }).start();
+ }
}
}
+ /**
+ * seatunnel任务已经调用并开始执行.判断是否需要等待任务执行完成再执行下一个节点.
+ * 根据(seaTunnelConfig -> env-> job.mode)的值判断是否需要等待执行完再执行下一节点.
+ * BATCH:需要等待执行完再执行下一个节点,其他值不用管,每次执行也都执行一次.
+ *
+ * @author 王立宏
+ * @date 2023/11/27 11:17
+ */
+ @Override
+ public void afterProcess() {
+ if ("BATCH".equals(envJobModelValue)) {
+ // 判断该任务是否执行完毕(Api调用判断、SSH执行判断).
+ if (this.seatunnelConfigModel.getCallMode() == 2) {
+ // SSH方式执行Seatunnel任务, 当前只能够等待任务执行完成才可以执行下一组件
+ // 非等待任务则开启一个线程来执行 SSH 命令,log日志则保存在 Seatunnel 路径的log文件夹中
+ return;
+ }
+ // API调用判断
+ long startTime = System.currentTimeMillis();
+ int sleepTime = 1000; //休眠时间
+ while (true) {
+ StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri());
+ url.append("/hazelcast/rest/maps/running-job/").append(dataTransferModel.getJobId());
+ HttpResponse execute;
+ try {
+ execute = HttpRequest.get(url.toString()).execute();
+ } catch (Exception e) {
+ String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage();
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage));
+ }
+ if (!execute.isOk()) {
+ nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行信息获取失败,失败信息为:" + execute.body()));
+ if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) {
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!"));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!"));
+ }
+ continue;
+ }
+ String body = URLUtil.decode(execute.body());
+ // bodyJsonNode => {} 或 {"jobId":"","jobName":"","jobStatus":"","envOptions":{},"createTime":"","jobDag":{"vertices":[],"edges":[]},"pluginJarsUrls":[],"isStartWithSavePoint":false,"metrics":{"sourceReceivedCount":"","sinkWriteCount":""}}
+ JsonNode bodyJsonNode;
+ try {
+ bodyJsonNode = new JsonMapper().readTree(body);
+ } catch (JsonProcessingException e) {
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage()));
+ }
+ if (bodyJsonNode.size() == 0) {
+ // 未查询到该jobId所对应的任务信息{},认为该任务已经执行完毕。
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!"));
+ return;
+ }
+ if ("RUNNING".equals(bodyJsonNode.get("jobStatus").asText())) {
+ // 任务运行中,休眠5秒后继续判断是否完成.
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行中..."));
+ if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) {
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!"));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!"));
+ }
+ ThreadUtil.sleep(sleepTime);
+ //每次休眠后都追加3秒的时间,直至休眠时间大于1分钟。
+ if (sleepTime < 60000) sleepTime = sleepTime + 3000;
+ } else {
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!"));
+ return;
+ }
+ }
+ } else if ("STREAMING".equals(envJobModelValue)) {
+ // 无需等待Seatunnel任务执行完成,直接执行下一个节点.
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "env节点下的job.mode节点值为\"STREAMING\", 不关心执行结果, 直接执行下一组件节点."));
+ return;
+ }
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "'env'节点下的'job.mode'节点值为'" + envJobModelValue + "', 不关心该数据传输组件中的 Seatunnel 执行结果, 直接执行下一组件节点."));
+ }
+
private void restApiSubmitJob() {
if (seatunnelConfigModel.getSubmitJobUrl() == null) {
seatunnelConfigModel.setSubmitJobUrl(new SeatunnelConfig().getSubmitJobUrl());
nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
- "未配置Seatunnel提交Job的接口地址,使用默认地址:" + seatunnelConfigModel.getSubmitJobUrl()));
+ "未配置Seatunnel提交Job的接口地址,使用默认地址:" + seatunnelConfigModel.getSubmitJobUrl()));
}
StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri());
// 判断字符串第一个字符是否为"/"
@@ -108,14 +215,14 @@ public class DataTransferNode extends BaseNode {
url.append("isStartWithSavePoint=").append(dataTransferModel.getIsStartWithSavePoint());
}
nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "提交Job:" + url.toString()));
- HttpResponse execute = null;
+ HttpResponse execute;
try {
execute = HttpRequest.post(url.toString())
.body(dataTransferModel.getSeaTunnelConfig())
.execute();
} catch (Exception e) {
- String errorMessage = "RestApi(" + url.toString() + ")调用报错:" + e.getMessage();
- nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败:" + errorMessage));
+ String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage();
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage));
throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage));
}
String body = URLUtil.decode(execute.body());
@@ -124,10 +231,21 @@ public class DataTransferNode extends BaseNode {
devLiteflowNodeMapper.setExecutionMessage(this.rulerId, this.nodeId, this.nodeTag, body);
if (!execute.isOk()) {
// 失败
- nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败:" + body));
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + body));
throw new MyLiteFlowException(new ErrorMessageModel(getClass(), body));
} else {
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "执行成功:" + body));
+ // body => {"jobId":733584788375666689,"jobName":"rest_api_test"}
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "任务调用成功,返回值为: " + body));
+ try {
+ // 成功,设置jobId,jobId将在该 DataTransferNode 组件的 afterProcess() 方法使用.
+ JsonNode jsonNode = new JsonMapper().readTree(JSONUtil.toJsonStr(body));
+ Long jobId = jsonNode.get("jobId").asLong();
+ dataTransferModel.setJobId(jobId);
+ } catch (Exception e) {
+ String errorMessage = "设置jobId失败,失败学习为: " + e.getMessage();
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, errorMessage));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage));
+ }
}
}
@@ -137,14 +255,16 @@ public class DataTransferNode extends BaseNode {
RemoteShell remoteShell = new RemoteShellSshjImpl(
remoteHost.getHostIp(), remoteHost.getHostPort(),
remoteHost.getLoginName(), remoteHost.getPassword(), null);
-
- tempFilePath = "./tempFolder/" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS-")
+ /**
+ * 临时文件路径
+ */
+ String tempFilePath = "./tempFolder/" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS-")
+ RandomUtil.randomString(5) + "-config.json";
contentWriteToFile(tempFilePath, JSONUtil.toJsonStr(dataTransferModel.getSeaTunnelConfig()));
// 上传配置文件(v2.batch.config.template)至 seatunnel 的 ./config/ 中
- String remoteConfigName = "v2.supie.config.json";
+ String remoteConfigName = "v2-supie-config" + DateUtil.format(new Date(), "-yyyy-MM-dd-HH-mm-ss-SSS") + ".json";
String remoteFilePath = seatunnelConfigModel.getSeatunnelPath() + "/config/" + remoteConfigName;
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "上传Seatunnel配置文件,remoteFilePath:" + remoteFilePath + "。"));
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "上传Seatunnel配置文件,remoteFilePath:" + remoteFilePath + "."));
remoteShell.uploadFile(tempFilePath, remoteFilePath);
// 执行命令
nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
@@ -155,7 +275,7 @@ public class DataTransferNode extends BaseNode {
"sh bin/seatunnel.sh --config config/" + remoteConfigName + " -e local");
remoteShell.close();
// 存储执行结果信息
- if (resultMsg == null) resultMsg = "无回执结果信息!";
+ if (resultMsg == null) resultMsg = "无回执结果信息, 请检查!";
nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag, resultMsg));
devLiteflowNodeMapper.setExecutionMessage(this.rulerId, this.nodeId, this.nodeTag, resultMsg);
// 删除创建的临时文件
@@ -163,6 +283,42 @@ public class DataTransferNode extends BaseNode {
file.delete();
}
+ private void sshSubmitJobAndUploadLogFil(String logFileName) {
+ // 根据项目ID 获取到该项目的远程服务器的配置
+ RemoteHost remoteHost = remoteHostMapper.selectById(seatunnelConfigModel.getRemoteHostId());
+ RemoteShell remoteShell = new RemoteShellSshjImpl(
+ remoteHost.getHostIp(), remoteHost.getHostPort(),
+ remoteHost.getLoginName(), remoteHost.getPassword(), null);
+ /**
+ * 临时文件路径
+ */
+ String tempFilePathOfConfig = "./tempFolder/" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS-")
+ + RandomUtil.randomString(5) + "-config.json";
+ contentWriteToFile(tempFilePathOfConfig, JSONUtil.toJsonStr(dataTransferModel.getSeaTunnelConfig()));
+ // 上传配置文件(v2.batch.config.template)至 seatunnel 的 ./config/ 中
+ String remoteConfigName = "v2-supie-config" + DateUtil.format(new Date(), "-yyyy-MM-dd-HH-mm-ss-SSS") + ".json";
+ String remoteFilePathOfConfig = seatunnelConfigModel.getSeatunnelPath() + "/config/" + remoteConfigName;
+ remoteShell.uploadFile(tempFilePathOfConfig, remoteFilePathOfConfig);
+ // 执行命令
+ String resultMsg = remoteShell.execCommands(
+ "cd " + seatunnelConfigModel.getSeatunnelPath(),
+ "sh bin/seatunnel.sh --config config/" + remoteConfigName + " -e local");
+ // 日志保存及上传
+ String tempFilePathOfLog = "./tempFolder/" + logFileName;
+ String remoteFilePathOfLog = seatunnelConfigModel.getSeatunnelPath() + "/taskLog/" + logFileName;
+ contentWriteToFile(tempFilePathOfLog, resultMsg);
+ remoteShell.uploadFile(tempFilePathOfLog, remoteFilePathOfLog);
+ remoteShell.close();
+ // 存储执行结果信息
+ if (resultMsg == null) resultMsg = "无回执结果信息, 请检查!";
+ devLiteflowNodeMapper.setExecutionMessage(this.rulerId, this.nodeId, this.nodeTag, resultMsg);
+ // 删除创建的临时文件
+ File configFile = new File(tempFilePathOfConfig);
+ configFile.delete();
+ File logFile = new File(tempFilePathOfLog);
+ logFile.delete();
+ }
+
/**
* 写入文件输入流
*
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java b/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java
index bf8e336a98f0ddbb04979d714789dbdbee2b2ef7..0cd02308c96b8f581fe8fc844b79098e20f5842b 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java
@@ -96,6 +96,11 @@ public class CustomizeRoute extends BaseModel {
*/
private Long processId;
+ /**
+ * 指标ID。
+ */
+ private Long definitionIndexId;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java b/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java
index b2a8f9cc780983c7d659d15badb55d92249a0ca8..c09070190ac441d8cbeee189a9651a40abce45a1 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java
@@ -92,9 +92,9 @@ public class DefinitionIndex extends BaseModel {
private String indexDescription;
/**
- * 关联明细表id。
+ * 动态路由id。
*/
- private Long modelLogicalId;
+ private Long customizeRouteId;
/**
* 关联字段。
@@ -111,26 +111,6 @@ public class DefinitionIndex extends BaseModel {
*/
private String productPeriod;
- /**
- * 计算函数。
- */
- private String caliberCalculateFunction;
-
- /**
- * 度量单位。
- */
- private String caliberMeasureUnit;
-
- /**
- * 度量精度。
- */
- private String caliberPrecision;
-
- /**
- * 口径说明。
- */
- private String caliberDescription;
-
/**
* updateTime 范围过滤起始值(>=)。
*/
@@ -156,7 +136,7 @@ public class DefinitionIndex extends BaseModel {
private String createTimeEnd;
/**
- * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / caliber_calculate_function / caliber_measure_unit / caliber_precision / caliber_description LIKE搜索字符串。
+ * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / LIKE搜索字符串。
*/
@TableField(exist = false)
private String searchString;
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java b/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java
index 262686a218f8f5bdf490b816777b810c17c2ee0a..1169fbf4638f0d83ec95836fa97434caa351b2af 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java
@@ -171,6 +171,11 @@ public class ModelDesginField extends BaseModel {
*/
private Long processId;
+ /**
+ * 数据标准主表ID standard_main_id。
+ */
+ private Long standardMainId;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java
index b2c5c625e786d2faad7fa9f763786999f37e8c4c..584561eae8e523d3d35b25769f6b3f45e89b7660 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java
@@ -96,6 +96,11 @@ public class StandardMain extends BaseModel {
*/
private String standardStatus;
+ /**
+ * 正则表达式。
+ */
+ private String standardRegular;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java
index b12395094daf08193fb1b69dea510a8984492ed6..d2cb9e4228b8ac5c0d0229d0e33592169c0e70c1 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java
@@ -72,14 +72,19 @@ public class StandardQuatity extends BaseModel {
private String standardQaulityRe;
/**
- * 质量校验长度限制。
+ * 质量校验长度限制(正数->大等于。负数->小等于)。
*/
- private String standardQualityLang;
+ private Integer standardQualityLang;
/**
- * 质量校验不为空。
+ * 质量校验不为空(1:不为空。0:可为空)。
*/
- private String standardQualityNotNull;
+ private Integer standardQualityNotNull;
+
+ /**
+ * SQL条件。
+ */
+ private String customJudgment;
/**
* 质量校验中心关联规则。
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java
index 2ea93e0e5710d7d15d759a17e2158703d10edb83..ba1e9c829e6c2f32b60ff5e9abcf4d5d4867dbc5 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java
@@ -335,21 +335,29 @@ public class BaseDataSource {
tableName = sqlList.get(1);
}
ResultSet resultSet = metaData.getColumns(databaseName, schemaPattern, tableName, null);
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ int columnCount = resultSetMetaData.getColumnCount();
resultData = new ArrayList<>();
while (resultSet.next()) {
HashMap dataTypeMap = new HashMap<>();
- // 字段名
- String columnName = resultSet.getString("COLUMN_NAME");
- // 字段类型
- String dataType = resultSet.getString("TYPE_NAME");
- // 字段大小
- int columnSize = resultSet.getInt("COLUMN_SIZE");
- // 字段注释
- String columnComment = resultSet.getString("REMARKS");
- dataTypeMap.put("fieldName",columnName);
- dataTypeMap.put("typeName",dataType);
- dataTypeMap.put("columnSize",columnSize);
- dataTypeMap.put("remarks",columnComment);
+ for (int i = 1; i <= columnCount; i++) {
+ String tableFieldName = resultSetMetaData.getColumnLabel(i);
+ dataTypeMap.put(tableFieldName, resultSet.getObject(tableFieldName));
+ }
+// // 字段名
+// String columnName = resultSet.getString("COLUMN_NAME");
+// // 字段类型
+// String dataType = resultSet.getString("TYPE_NAME");
+// // 字段大小
+// int columnSize = resultSet.getInt("COLUMN_SIZE");
+// // 字段注释
+// String columnComment = resultSet.getString("REMARKS");
+// String nullable = resultSet.getString("NULLABLE");
+// dataTypeMap.put("fieldName", columnName);
+// dataTypeMap.put("typeName", dataType);
+// dataTypeMap.put("columnSize", columnSize);
+// dataTypeMap.put("remarks", columnComment);
+// dataTypeMap.put("nullable", nullable);
resultData.add(dataTypeMap);
}
resultSet.close();
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java
index 067ca1200eb5d0c40e0a270ba3d8619a328b1465..b966455be8e7224fa2a6570621cd75d6533f1db2 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java
@@ -1,5 +1,6 @@
package supie.webadmin.app.service.databasemanagement.strategyImpl;
+import cn.hutool.core.text.StrSplitter;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +10,11 @@ import supie.webadmin.app.service.databasemanagement.Strategy;
import supie.webadmin.app.service.databasemanagement.StrategyFactory;
import javax.annotation.PostConstruct;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* 描述:
@@ -58,4 +64,52 @@ public class DataSourceDoris extends BaseDataSource implements Strategy {
initConnection();
}
+ /**
+ * 查询数据库数据的字段名及类型
+ *
+ * @param databaseName
+ * @param tableName
+ * @return
+ */
+ @Override
+ public List