diff --git a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/RegisterServiceImpl.java b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/RegisterServiceImpl.java index 54eb18fea6d88c9208abd06099331900bc31f2d2..be5d502614592b8e9b86add7ceea2d2dc8b4e7b7 100644 --- a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/RegisterServiceImpl.java +++ b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/RegisterServiceImpl.java @@ -8,6 +8,7 @@ import cn.icanci.rec.admin.dal.mongodb.daointerface.RegisterDAO; import cn.icanci.rec.admin.dal.mongodb.dateobject.RegisterDO; import cn.icanci.rec.common.model.config.RegisterVO; +import java.util.Date; import java.util.List; import javax.annotation.Resource; @@ -60,6 +61,15 @@ public class RegisterServiceImpl implements RegisterService, InitializingBean { @Override public void doRegister(List registers) { // 查询,如果有,则更新时间,没有则插入 + for (RegisterVO register : registers) { + RegisterDO dbRegister = registerDAO.queryUnionOne(register.getDomain(), register.getClientAddress(), register.getClientPort()); + if (dbRegister == null) { + save(register); + } else { + dbRegister.setLastUpdateTime(new Date()); + registerDAO.update(dbRegister); + } + } } @Override diff --git a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThread.java b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThread.java index 61ee21aed56596af842315d0e955296e2ea45510..c6e9f44932bc939ff8c4696619c0a39a04b961f3 100644 --- a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThread.java +++ b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThread.java @@ -22,7 +22,6 @@ import java.util.concurrent.locks.LockSupport; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.InitializingBean; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -34,7 +33,7 @@ import com.google.common.collect.Sets; * @since 1.0 Created in 2022/11/22 22:11 */ @SuppressWarnings("all") -public class TriggerThread implements InitializingBean { +public class TriggerThread { private static final Logger logger = LoggerFactory.getLogger(TriggerThread.class); @@ -71,6 +70,8 @@ public class TriggerThread implements InitializingBean { public static void setRegisterService(RegisterService registerService) { TriggerThread.registerService = registerService; + + start(); } /** @@ -96,33 +97,35 @@ public class TriggerThread implements InitializingBean { // 进行消息通知触达线程 Thread triggerQueueThread = new Thread(() -> { try { - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3)); + while (true) { + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3)); - while (triggerQueue.size() > 0) { + while (triggerQueue.size() > 0) { - final RegisterVO register = triggerQueue.poll(); + final RegisterVO register = triggerQueue.poll(); - commonPool.execute(new Runnable() { - @Override - public void run() { - String reqUrl = String.format(REFRESH_REQUEST_FORMAT, register.getClientAddress(), register.getClientPort()); + commonPool.execute(new Runnable() { + @Override + public void run() { + String reqUrl = String.format(REFRESH_REQUEST_FORMAT, register.getClientAddress(), register.getClientPort()); - PublishDTO publishDTO = new PublishDTO(); + PublishDTO publishDTO = new PublishDTO(); - publishDTO.setDomainCodes(Sets.newHashSet(register.getDomain())); + publishDTO.setDomainCodes(Sets.newHashSet(register.getDomain())); - Client.RpcRequest rpcRequest = new Client.RpcRequest(publishDTO, DEFAULT_APPLICATION_JSON_VALUE, Maps.newHashMap(), reqUrl, HttpMethod.POST.name(), - false, 3, TimeUnit.SECONDS, 0, SerializerEnum.FASTJSON, SerializerEnum.FASTJSON); + Client.RpcRequest rpcRequest = new Client.RpcRequest(publishDTO, DEFAULT_APPLICATION_JSON_VALUE, Maps.newHashMap(), reqUrl, HttpMethod.POST.name(), + false, 3, TimeUnit.SECONDS, 0, SerializerEnum.FASTJSON, SerializerEnum.FASTJSON); - SocketMessage call = CLIENT.call(rpcRequest, SocketMessage.class); - if (call.isSuccess()) { - register.setLastUpdateTime(new Date()); - registerService.save(register); - } else { - logger.warn("[TriggerThread][start][refresh] error message:{}", call.getContent()); + SocketMessage call = CLIENT.call(rpcRequest, SocketMessage.class); + if (call.isSuccess()) { + register.setLastUpdateTime(new Date()); + registerService.save(register); + } else { + logger.warn("[TriggerThread][start][refresh] error message:{}", call.getContent()); + } } - } - }); + }); + } } } catch (Throwable e) { logger.warn("[TriggerThread][start][Throwable] error message:{}", e.getMessage()); @@ -132,11 +135,6 @@ public class TriggerThread implements InitializingBean { triggerQueueThread.start(); } - @Override - public void afterPropertiesSet() throws Exception { - start(); - } - /** 任务执行器 */ private static class JobRunner implements Runnable { @@ -209,7 +207,8 @@ public class TriggerThread implements InitializingBean { try { boolean isDelay = (System.currentTimeMillis() - register.getLastUpdateTime().getTime()) / 1000 > REGISTER_TIME_OUT; if (isDelay) { - registerService.deleteById(register.getId()); + register.setIsDelete(1); + registerService.save(register); } } catch (Throwable ex) { // no op diff --git a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/daointerface/RegisterDAO.java b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/daointerface/RegisterDAO.java index 42936f284a8e266dd83d40a2d3c72985aef6419a..771c2e1a6b3d1ac810d8afc612d4c1a0c937c641 100644 --- a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/daointerface/RegisterDAO.java +++ b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/daointerface/RegisterDAO.java @@ -24,6 +24,8 @@ public interface RegisterDAO extends BaseDAO { List queryByDomainCode(String domainCode); + RegisterDO queryUnionOne(String domain, String clientAddress, int clientPort); + interface RegisterColumn extends BaseColumn { String clientAddress = "clientAddress"; String clientPort = "clientPort"; diff --git a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoRegisterDAO.java b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoRegisterDAO.java index 60801c5a43f94d6f83291e0fd4d034ab7fdabf62..cd04454277340cf12a15393467415edf88002f55 100644 --- a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoRegisterDAO.java +++ b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoRegisterDAO.java @@ -1,5 +1,9 @@ package cn.icanci.rec.admin.dal.mongodb.mongo; +import cn.icanci.rec.admin.dal.mongodb.common.PageList; +import cn.icanci.rec.admin.dal.mongodb.daointerface.RegisterDAO; +import cn.icanci.rec.admin.dal.mongodb.dateobject.RegisterDO; + import java.util.List; import org.apache.commons.lang3.StringUtils; @@ -8,10 +12,6 @@ import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; -import cn.icanci.rec.admin.dal.mongodb.common.PageList; -import cn.icanci.rec.admin.dal.mongodb.daointerface.RegisterDAO; -import cn.icanci.rec.admin.dal.mongodb.dateobject.RegisterDO; - /** * @author icanci * @since 1.0 Created in 2022/11/22 22:00 @@ -73,4 +73,14 @@ public class MongoRegisterDAO extends AbstractBaseDAO implements Reg Query query = new Query(criteria); return mongoTemplate.find(query, COLLECTION_CLASS, COLLECTION_NAME); } + + @Override + public RegisterDO queryUnionOne(String domain, String clientAddress, int clientPort) { + Criteria criteria = Criteria.where(RegisterColumn.domain).is(domain); + criteria.and(RegisterColumn.env).is(DEFAULT_ENV); + criteria.and(RegisterColumn.clientAddress).is(clientAddress); + criteria.and(RegisterColumn.clientPort).is(clientPort); + Query query = new Query(criteria); + return mongoTemplate.findOne(query, COLLECTION_CLASS, COLLECTION_NAME); + } }