From f531f56ee36aecd3bb9eae527551eb8eff8c9457 Mon Sep 17 00:00:00 2001 From: chenjingwen Date: Mon, 11 Dec 2023 19:52:42 +0800 Subject: [PATCH] secDetectord: fix a grpc hang bug break connection loop before shutdown so that shutdown won't hang. Signed-off-by: chenjingwen --- observer_agent/grpc_comm/grpc_api.h | 1 + observer_agent/grpc_comm/server.cpp | 30 ++++++++++++++++++++++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h index 4bde109..27a9139 100644 --- a/observer_agent/grpc_comm/grpc_api.h +++ b/observer_agent/grpc_comm/grpc_api.h @@ -26,6 +26,7 @@ class PubSubServiceImpl final : public SubManager::Service grpc::Status Subscribe(ServerContext *context, const SubscribeRequest *request, ServerWriter *writer); grpc::Status Publish(ServerContext *context, const PublishRequest *request, Message *response); grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response); + void CloseAllConnection(void); private: std::unordered_map> suber_topic_; diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index cce5131..b47b1aa 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -16,6 +16,7 @@ #include "comm_api.grpc.pb.h" #include #include +#include using data_comm::Message; using data_comm::PublishRequest; @@ -30,9 +31,23 @@ using grpc::ServerWriter; #define MAX_CONNECTION 5 #define CHECK_TIME 60 +static bool killed = false; + class PubSubServiceImpl final : public SubManager::Service { public: + void CloseAllConnection(void) + { + std::lock_guard lk(wait_mutex); + + for (int i = 0; i < MAX_CONNECTION; i++) { + connect_status[i] = false; + } + + killed = true; + cv.notify_all(); + } + grpc::Status Subscribe(ServerContext *context, const SubscribeRequest *request, ServerWriter *writer) override { @@ -124,7 +139,7 @@ class PubSubServiceImpl final : public SubManager::Service } return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!"); } - sleep(CHECK_TIME); + WaitKeeplive(); } return grpc::Status::OK; } @@ -203,21 +218,30 @@ class PubSubServiceImpl final : public SubManager::Service std::unordered_map *>> suber_writer_; std::unordered_map> suber_connection_; std::mutex sub_mutex; + std::mutex wait_mutex; + std::condition_variable cv; int connection_num = 0; bool connect_status[MAX_CONNECTION] = {false}; + + void WaitKeeplive(void) + { + std::unique_lock lk(wait_mutex); + cv.wait_for(lk, std::chrono::seconds(CHECK_TIME), []{ return killed; }); + } }; -std::unique_ptr server; +static std::unique_ptr server; +static PubSubServiceImpl service; void StopServer() { + service.CloseAllConnection(); server->Shutdown(); } void RunServer() { std::string server_address("unix:///var/run/secDetector.sock"); - PubSubServiceImpl service; ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); -- Gitee