diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index 9e0e76cdc09e983370d40e7e1c2d3bd53d0bcd60..ccfcb5c03123d4476a8a4f2ee21d0cecc4605f01 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -39,7 +39,7 @@ class PubSubServiceImpl final : public SubManager::Service for (auto iter = suber_topic_[cli_name].begin(); iter != suber_topic_[cli_name].end(); iter++) { - if (*iter == cli_topic) + if ((*iter & cli_topic) != 0) { msg.set_text("this client name already subscribe the topic"); if (!writer->Write(msg)) @@ -51,9 +51,13 @@ class PubSubServiceImpl final : public SubManager::Service } } + sub_mutex.lock(); + suber_topic_[cli_name].push_back(cli_topic); suber_writer_[cli_name].push_back(writer); + sub_mutex.unlock(); + msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!"); if (!writer->Write(msg)) { @@ -81,7 +85,7 @@ class PubSubServiceImpl final : public SubManager::Service i = 0; for (auto topic_item : iter->second) { - if (topic_item == cli_topic) + if ((topic_item & cli_topic) != 0) { auto &subscriber = suber_writer_[iter->first][i]; if (!subscriber->Write(msg)) @@ -103,6 +107,9 @@ class PubSubServiceImpl final : public SubManager::Service int cli_topic = request->topic(); std::string cli_name = request->sub_name(); int i = 0; + int unsub_flag = 0; + + std::lock_guard lock(sub_mutex); for (auto topic_item : suber_topic_[cli_name]) { @@ -110,10 +117,17 @@ class PubSubServiceImpl final : public SubManager::Service { suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); + unsub_flag = 1; break; } i++; } + + if (!unsub_flag) + { + response->set_text("don't exist the topic: " + std::to_string(cli_topic)); + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe topic!"); + } response->set_text("topic: " + std::to_string(cli_topic) + " UnSubscribe success!"); return grpc::Status::OK; } @@ -121,6 +135,7 @@ class PubSubServiceImpl final : public SubManager::Service private: std::unordered_map> suber_topic_; std::unordered_map *>> suber_writer_; + std::mutex sub_mutex; }; std::unique_ptr server;