From 8dd0f6984ef002e30f3d7aa133a1a439fc5d0f95 Mon Sep 17 00:00:00 2001 From: chenjingwen Date: Thu, 14 Dec 2023 21:55:10 +0800 Subject: [PATCH] grpc: fix coredump in Publish fix coredump in Publish Signed-off-by: chenjingwen --- observer_agent/grpc_comm/server.cpp | 165 +++++++++++----------------- 1 file changed, 62 insertions(+), 103 deletions(-) diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index 938d09c..b858853 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -33,16 +33,21 @@ using grpc::ServerWriter; static bool killed = false; +class Subscribers { +public: + int topic; + ServerWriter *writer; + + Subscribers(int t, ServerWriter *w) : topic(t), writer(w) {} + Subscribers() : topic(0), writer(nullptr) {} +}; + class PubSubServiceImpl final : public SubManager::Service { public: void CloseAllConnection(void) { - std::lock_guard lk(wait_mutex); - - for (int i = 0; i < MAX_CONNECTION; i++) { - connect_status[i] = false; - } + std::lock_guard lk(wait_mutex); killed = true; cv.notify_all(); @@ -55,50 +60,21 @@ class PubSubServiceImpl final : public SubManager::Service std::string cli_name = request->sub_name(); Message msg; Message keepalive_msg; - int i = 0, tmp_index; + sub_mutex.lock(); if (connection_num >= MAX_CONNECTION) { msg.set_text("over max connection number!"); - if (!writer->Write(msg)) - { - std::cerr << "Failed to write the initial message" << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); - } - return grpc::Status(grpc::StatusCode::INTERNAL, "over max connection number, Failed to Subscribe the topic"); - } - - for (auto iter = suber_topic_[cli_name].begin(); iter != suber_topic_[cli_name].end(); iter++) - { - if ((*iter & cli_topic) != 0) - { - msg.set_text("this client name already subscribe the topic"); - if (!writer->Write(msg)) - { - std::cerr << "Failed to write the initial message" << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); - } - return grpc::Status(grpc::StatusCode::INTERNAL, "this client name already subscribe the topic"); - } - } - - sub_mutex.lock(); - - for (tmp_index = 0; tmp_index < MAX_CONNECTION; tmp_index++) - { - if (!connect_status[tmp_index]) - break; + writer->Write(msg); + sub_mutex.unlock(); + return grpc::Status(grpc::StatusCode::INTERNAL, "over max connection number"); } - if (tmp_index == MAX_CONNECTION) - { + auto iter = suber_topic_.find(cli_name); + if (iter != suber_topic_.end()) { + msg.set_text("this client name already subscribe the topic"); + writer->Write(msg); sub_mutex.unlock(); - msg.set_text("multi-process max connection number!"); - if (!writer->Write(msg)) - { - std::cerr << "Failed to write the initial message" << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); - } - return grpc::Status(grpc::StatusCode::INTERNAL, "multi-process max connection number, Failed to Subscribe the topic"); + return grpc::Status(grpc::StatusCode::INTERNAL, "this client name already subscribe the topic"); } msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!"); @@ -109,65 +85,50 @@ class PubSubServiceImpl final : public SubManager::Service return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); } - suber_topic_[cli_name].push_back(cli_topic); - suber_writer_[cli_name].push_back(writer); - suber_connection_[cli_name].push_back(tmp_index); - connect_status[tmp_index] = true; + std::cout << "Subscribe " << cli_name << " ok" << std::endl; + suber_topic_[cli_name] = Subscribers(cli_topic, writer); connection_num++; - sub_mutex.unlock(); - keepalive_msg.set_text("keepalive"); - while (connect_status[tmp_index]) + /* loop until connot write */ + while (!killed) { - if (!writer->Write(keepalive_msg)) - { - for (auto topic_item : suber_topic_[cli_name]) - { - if (topic_item == cli_topic) - { - sub_mutex.lock(); - suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); - suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); - connect_status[suber_connection_[cli_name].at(i)] = false; - suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i); - connection_num--; - sub_mutex.unlock(); - break; - } - i++; - } + sub_mutex.lock(); + if (suber_topic_.count(cli_name) == 0) { + sub_mutex.unlock(); + return grpc::Status::OK; + } + + keepalive_msg.set_text("keepalive"); + if (!writer->Write(keepalive_msg)) { + DeleteSubscriberByCliName(cli_name); + sub_mutex.unlock(); return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!"); } + sub_mutex.unlock(); WaitKeeplive(); } + + std::cout << cli_name << " is dead" << std::endl; return grpc::Status::OK; } grpc::Status Publish(ServerContext *context, const PublishRequest *request, Message *response) override { + std::lock_guard lock(sub_mutex); int cli_topic = request->topic(); std::string cli_data = request->data(); - int i = 0; Message msg; msg.set_text(cli_data); for (auto iter = suber_topic_.begin(); iter != suber_topic_.end(); iter++) { - i = 0; - for (auto topic_item : iter->second) - { - if ((topic_item & cli_topic) != 0) - { - auto &subscriber = suber_writer_[iter->first][i]; - if (!subscriber->Write(msg)) - { - std::cerr << "Failed to write to a subscriber" << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); - } - break; + Subscribers subscriber = iter->second; + if ((subscriber.topic & cli_topic) != 0) { + if (!subscriber.writer->Write(msg)) { + std::cerr << "Failed to write to a subscriber: " << iter->first << std::endl; + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); } - i++; } } @@ -177,8 +138,7 @@ class PubSubServiceImpl final : public SubManager::Service grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response) override { std::string cli_name = request->sub_name(); - int i = 0; - int unsub_flag = 0; + std::lock_guard lock(sub_mutex); if (connection_num <= 0) { response->set_text("connection_num <= 0, don't UnSubscribe!"); @@ -186,20 +146,7 @@ class PubSubServiceImpl final : public SubManager::Service return grpc::Status(grpc::StatusCode::INTERNAL, "connection_num <= 0, Failed to UnSubscribe topic!"); } - std::lock_guard lock(sub_mutex); - - std::unordered_map>::iterator iter = suber_topic_.find(cli_name); - if (iter != suber_topic_.end()) - { - suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); - suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); - connect_status[suber_connection_[cli_name].at(i)] = false; - suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i); - connection_num--; - unsub_flag = 1; - } - - if (!unsub_flag) + if (!DeleteSubscriberByCliName(cli_name)) { response->set_text("don't exist the reader"); return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe reader!"); @@ -209,19 +156,31 @@ class PubSubServiceImpl final : public SubManager::Service } private: - std::unordered_map> suber_topic_; - std::unordered_map *>> suber_writer_; - std::unordered_map> suber_connection_; + std::unordered_map suber_topic_; std::mutex sub_mutex; std::mutex wait_mutex; std::condition_variable cv; int connection_num = 0; - bool connect_status[MAX_CONNECTION] = {false}; void WaitKeeplive(void) { - std::unique_lock lk(wait_mutex); - cv.wait_for(lk, std::chrono::seconds(CHECK_TIME), []{ return killed; }); + std::unique_lock lk(wait_mutex); + cv.wait_for(lk, std::chrono::seconds(CHECK_TIME), []{ return killed; }); + } + + /* Must called with sub_mutex */ + bool DeleteSubscriberByCliName(std::string &cli_name) + { + bool exist = false; + std::cout << "UnSubscribe " << cli_name << " ok" << std::endl; + + auto it = suber_topic_.find(cli_name); + if (it != suber_topic_.end()) { + suber_topic_.erase(it); + connection_num--; + exist = true; + } + return exist; } }; -- Gitee