From e5a11b3972e4e14c3c75123f605381d55d67b41e Mon Sep 17 00:00:00 2001 From: xue_meng_en <1836611252@qq.com> Date: Tue, 20 Feb 2024 16:52:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=8E=A8=E9=80=81=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E5=90=8E=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/opengauss/cmrestapi/CMRestAPI.java | 8 +++++ .../opengauss/cmrestapi/CMRestAPIClient.java | 17 +++++++--- .../opengauss/cmrestapi/InfoPushThread.java | 33 +++++++++++++++++-- 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/opengauss/cmrestapi/CMRestAPI.java b/src/main/java/org/opengauss/cmrestapi/CMRestAPI.java index 3f0f105..c9c09e7 100644 --- a/src/main/java/org/opengauss/cmrestapi/CMRestAPI.java +++ b/src/main/java/org/opengauss/cmrestapi/CMRestAPI.java @@ -51,6 +51,8 @@ public class CMRestAPI { public static String appWhiteListFile = null; public static HashSet appWhiteList = null; public static Long lastModified = 0l; + public static int maxSendTimes = 1000; + public static int sendInterval = 100; private static Logger logger = LoggerFactory.getLogger(CMRestAPI.class); private static final String CHECK_GAUSSDB_PROCESS_CMD = "ps ux | grep -v grep | grep \"bin/gaussdb -D \" | awk '{print $2}'"; @@ -213,6 +215,12 @@ public class CMRestAPI { case "-w": appWhiteListFile = args[++i]; break; + case "-m": + maxSendTimes = Integer.parseInt(args[++i]); + break; + case "-I": + sendInterval = Integer.parseInt(args[++i]); + break; default: System.out.println("option " + args[i] + " is not support"); System.exit(ErrorCode.EINVAL.getCode()); diff --git a/src/main/java/org/opengauss/cmrestapi/CMRestAPIClient.java b/src/main/java/org/opengauss/cmrestapi/CMRestAPIClient.java index d92d95a..3ce23c1 100644 --- a/src/main/java/org/opengauss/cmrestapi/CMRestAPIClient.java +++ b/src/main/java/org/opengauss/cmrestapi/CMRestAPIClient.java @@ -18,6 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpEntity; import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.web.client.HttpClientErrorException; @@ -52,28 +53,32 @@ public class CMRestAPIClient { * @Description: * Push newest master info (ip and port) to url. * @param masterIpPort - * void + * boolean */ - public void pushMasterInfo(String masterInfo) { + public boolean pushMasterInfo(String masterInfo) { logger.info("Sendind newest master info({}) to {}", masterInfo, url); try { HttpEntity entity = new HttpEntity<>(masterInfo); ResponseEntity response = restTemplate.exchange(url + "/MasterInfo", HttpMethod.PUT, entity, String.class); logger.info("StatusCode: {}", response.getStatusCode()); logger.info("Msg: {}", response.getBody()); + if (response.getStatusCode() == HttpStatus.OK) { + logger.info("Send newest master info successfully."); + return true; + } } catch (ResourceAccessException | HttpClientErrorException e) { logger.error("Failed to send newest master info.\nDetail:{}", url, e.getMessage()); } - logger.info("Send newest master info successfully."); + return false; } /** * @Title: pushStandbysInfo * @Description: * Push current standbys' info(ip:port) to url. - * void + * boolean */ - public void pushStandbysInfo(String standbyInfo) { + public boolean pushStandbysInfo(String standbyInfo) { logger.info("Sendind newest standby info({}) to {}", standbyInfo, url); try { HttpEntity entity = new HttpEntity<>(standbyInfo); @@ -82,7 +87,9 @@ public class CMRestAPIClient { logger.info("Response msg: {}", response.getBody()); } catch (ResourceAccessException | HttpClientErrorException e) { logger.error("Failed to send newest standby info.\nDetail:{}", url, e.getMessage()); + return false; } logger.info("Send newest standby info successfully."); + return true; } } diff --git a/src/main/java/org/opengauss/cmrestapi/InfoPushThread.java b/src/main/java/org/opengauss/cmrestapi/InfoPushThread.java index b1fc2b9..e1e5782 100644 --- a/src/main/java/org/opengauss/cmrestapi/InfoPushThread.java +++ b/src/main/java/org/opengauss/cmrestapi/InfoPushThread.java @@ -29,7 +29,9 @@ public class InfoPushThread implements Runnable { private String recvAddrUrl; private String masterIpPort; private Logger logger = LoggerFactory.getLogger(InfoPushThread.class); - + private int maxSendTimes = CMRestAPI.maxSendTimes; + private int sendInterval = CMRestAPI.sendInterval; + InfoPushThread(int threadNo, String recvAddrUrl, String masterIpPort) { this.THREAD_NAME = "InfoPushThread-" + threadNo; this.masterIpPort = masterIpPort; @@ -39,8 +41,33 @@ public class InfoPushThread implements Runnable { @Override public void run() { CMRestAPIClient client = new CMRestAPIClient(recvAddrUrl); - client.pushMasterInfo(masterIpPort); - client.pushStandbysInfo(CMRestAPI.peerIpPorts); + int[] sendFailTimes = {0, 0}; + while (true) { + if (sendFailTimes[0] >= 0 && sendFailTimes[0] < maxSendTimes) { + if (client.pushMasterInfo(masterIpPort)) { + sendFailTimes[0] = -1; + } else { + ++sendFailTimes[0]; + } + } + + if (sendFailTimes[1] >= 0 && sendFailTimes[1] < maxSendTimes) { + if (client.pushStandbysInfo(CMRestAPI.peerIpPorts)) { + sendFailTimes[1] = -1; + } else { + ++sendFailTimes[1]; + } + } + if ((sendFailTimes[0] == -1 || sendFailTimes[0] >= maxSendTimes) && + (sendFailTimes[1] == -1 || sendFailTimes[1] >= maxSendTimes)) { + break; + } + try { + Thread.sleep(sendInterval); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } /** -- Gitee