From 1b7689a2574b71a5b7e607851b4f1f827e2105d9 Mon Sep 17 00:00:00 2001 From: hurricane618 Date: Thu, 16 Nov 2023 21:14:07 +0800 Subject: [PATCH] add mutex lock in sub/unsub and distinguish different topic bits add new lock and check topic bit Signed-off-by: hurricane618 --- observer_agent/grpc_comm/server.cpp | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index 9e0e76c..ccfcb5c 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -39,7 +39,7 @@ class PubSubServiceImpl final : public SubManager::Service for (auto iter = suber_topic_[cli_name].begin(); iter != suber_topic_[cli_name].end(); iter++) { - if (*iter == cli_topic) + if ((*iter & cli_topic) != 0) { msg.set_text("this client name already subscribe the topic"); if (!writer->Write(msg)) @@ -51,9 +51,13 @@ class PubSubServiceImpl final : public SubManager::Service } } + sub_mutex.lock(); + suber_topic_[cli_name].push_back(cli_topic); suber_writer_[cli_name].push_back(writer); + sub_mutex.unlock(); + msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!"); if (!writer->Write(msg)) { @@ -81,7 +85,7 @@ class PubSubServiceImpl final : public SubManager::Service i = 0; for (auto topic_item : iter->second) { - if (topic_item == cli_topic) + if ((topic_item & cli_topic) != 0) { auto &subscriber = suber_writer_[iter->first][i]; if (!subscriber->Write(msg)) @@ -103,6 +107,9 @@ class PubSubServiceImpl final : public SubManager::Service int cli_topic = request->topic(); std::string cli_name = request->sub_name(); int i = 0; + int unsub_flag = 0; + + std::lock_guard lock(sub_mutex); for (auto topic_item : suber_topic_[cli_name]) { @@ -110,10 +117,17 @@ class PubSubServiceImpl final : public SubManager::Service { suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); + unsub_flag = 1; break; } i++; } + + if (!unsub_flag) + { + response->set_text("don't exist the topic: " + std::to_string(cli_topic)); + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe topic!"); + } response->set_text("topic: " + std::to_string(cli_topic) + " UnSubscribe success!"); return grpc::Status::OK; } @@ -121,6 +135,7 @@ class PubSubServiceImpl final : public SubManager::Service private: std::unordered_map> suber_topic_; std::unordered_map *>> suber_writer_; + std::mutex sub_mutex; }; std::unique_ptr server; -- Gitee