diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp index a0d3586fffe11d4b1abedf8612f4b020b2263648..4ff5c297e4774bf1e8d903569b40b00846376710 100644 --- a/observer_agent/grpc_comm/client.cpp +++ b/observer_agent/grpc_comm/client.cpp @@ -31,12 +31,19 @@ using data_comm::PublishRequest; PubSubClient::PubSubClient(std::shared_ptr channel) : stub_(SubManager::NewStub(channel)) {} -std::unique_ptr> PubSubClient::Subscribe(const int topic) { - SubscribeRequest request; - request.set_topic(topic); - Message msg; - SubFlag = true; - return stub_->Subscribe(&context, request); +std::unique_ptr> PubSubClient::Subscribe(const int topic, const std::string& name) { + SubscribeRequest request; + request.set_topic(topic); + request.set_sub_name(name); + + Message msg; + SubFlag = true; + std::unique_ptr> reader = stub_->Subscribe(&context, request); + if (reader == nullptr) { + std::cerr << "Failed to subscribe." << std::endl; + return nullptr; + } + return reader; } void PubSubClient::Publish(const int topic, const std::string& content) { @@ -51,9 +58,10 @@ void PubSubClient::Publish(const int topic, const std::string& content) { } } -void PubSubClient::UnSubscribe(const int topic) { +void PubSubClient::UnSubscribe(const int topic, const std::string& name) { UnSubscribeRequest request; request.set_topic(topic); + request.set_sub_name(name); ClientContext unsub_context; Message msg; @@ -62,16 +70,18 @@ void PubSubClient::UnSubscribe(const int topic) { if (!status.ok()) { std::cerr << "Error: " << status.error_code() << ": " << status.error_message() << std::endl; + } else { + std::cout << "Received: " << msg.text() << std::endl; } - - std::cout << "Received: " << msg.text() << std::endl; - - return; } std::string PubSubClient::ReadFrom(std::unique_ptr> &reader) { Message msg; - reader->Read(&msg); - std::cout << "Received: " << msg.text() << std::endl; - return msg.text(); + if (reader->Read(&msg)) { + std::cout << "Received: " << msg.text() << std::endl; + return msg.text(); + } else { + std::cerr << "Failed to read from the server." << std::endl; + return ""; // Handle read error + } } diff --git a/observer_agent/grpc_comm/client_pub_demo.cpp b/observer_agent/grpc_comm/client_pub_demo.cpp index 713573897746f834f52af07861c0fb37f251404e..273dce3921ca143d610267c61adbbb417fc5880c 100644 --- a/observer_agent/grpc_comm/client_pub_demo.cpp +++ b/observer_agent/grpc_comm/client_pub_demo.cpp @@ -32,9 +32,10 @@ class PubSubClient { PubSubClient(std::shared_ptr channel) : stub_(SubManager::NewStub(channel)) {} - std::unique_ptr> Subscribe(const int topic) { + std::unique_ptr> Subscribe(const int topic, const std::string& name) { SubscribeRequest request; request.set_topic(topic); + request.set_sub_name(name); Message msg; SubFlag = true; @@ -58,9 +59,10 @@ class PubSubClient { } } - void UnSubscribe(const int topic) { + void UnSubscribe(const int topic, const std::string& name) { UnSubscribeRequest request; request.set_topic(topic); + request.set_sub_name(name); ClientContext unsub_context; Message msg; @@ -96,11 +98,14 @@ int main(int argc, char** argv) { PubSubClient client(grpc::CreateChannel( server_address, grpc::InsecureChannelCredentials())); - client.Publish(1, "ahahahah"); + if (argc != 3) { + std::cout << "[Usage] ./client_pub_demo topic_num publish_data" << std::endl; + } + client.Publish(std::stoi(argv[1]), argv[2]); sleep(3); - client.Publish(1, "hello,world!"); + client.Publish(std::stoi(argv[1]), "hello,world!"); sleep(3); - client.Publish(1, "end"); + client.Publish(std::stoi(argv[1]), "end"); return 0; } diff --git a/observer_agent/grpc_comm/client_sub_demo.cpp b/observer_agent/grpc_comm/client_sub_demo.cpp index 88243e3b344576f83e8ad58c346a26fff43f3d55..a54eec2871f6d4757341a1ca0ac8103ffb020000 100644 --- a/observer_agent/grpc_comm/client_sub_demo.cpp +++ b/observer_agent/grpc_comm/client_sub_demo.cpp @@ -32,9 +32,10 @@ class PubSubClient { PubSubClient(std::shared_ptr channel) : stub_(SubManager::NewStub(channel)) {} - std::unique_ptr> Subscribe(const int topic) { + std::unique_ptr> Subscribe(const int topic, const std::string& name) { SubscribeRequest request; request.set_topic(topic); + request.set_sub_name(name); Message msg; SubFlag = true; @@ -58,9 +59,10 @@ class PubSubClient { } } - void UnSubscribe(const int topic) { + void UnSubscribe(const int topic, const std::string& name) { UnSubscribeRequest request; request.set_topic(topic); + request.set_sub_name(name); ClientContext unsub_context; Message msg; @@ -96,13 +98,18 @@ int main(int argc, char** argv) { PubSubClient client(grpc::CreateChannel( server_address, grpc::InsecureChannelCredentials())); - std::unique_ptr> cli_reader = client.Subscribe(1); + if (argc != 3) { + std::cout << "[Usage] ./client_sub_demo topic_num suber_name" << std::endl; + } + + std::unique_ptr> cli_reader = client.Subscribe(std::stoi(argv[1]), argv[2]); std::string some_data = client.ReadFrom(cli_reader); std::cout << "whz: " << some_data << std::endl; while (some_data != "" && some_data != "end") { some_data = client.ReadFrom(cli_reader); std::cout << "loop whz: " << some_data << std::endl; } + client.UnSubscribe(std::stoi(argv[1]), argv[2]); return 0; } diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h index dfffebfc3e97b5d62cbd5da40724d5d89e4ba9c7..624da97c6f8b12258287fda9c9098c1344969f60 100644 --- a/observer_agent/grpc_comm/grpc_api.h +++ b/observer_agent/grpc_comm/grpc_api.h @@ -34,9 +34,9 @@ void RunServer(); class PubSubClient { public: PubSubClient(std::shared_ptr channel); - std::unique_ptr> Subscribe(const int topic); + std::unique_ptr> Subscribe(const int topic, const std::string& name); void Publish(const int topic, const std::string& content); - void UnSubscribe(const int topic); + void UnSubscribe(const int topic, const std::string& name); std::string ReadFrom(std::unique_ptr> &reader); private: diff --git a/observer_agent/grpc_comm/protos/comm_api.proto b/observer_agent/grpc_comm/protos/comm_api.proto index 37e870ba85395d8003b925a58d5ab07d6330d17a..6c84865a9009a23b51e32f94b3d96c275be06229 100644 --- a/observer_agent/grpc_comm/protos/comm_api.proto +++ b/observer_agent/grpc_comm/protos/comm_api.proto @@ -9,10 +9,12 @@ message Message { message SubscribeRequest { int32 topic = 1; + string sub_name = 2; } message UnSubscribeRequest { int32 topic = 1; + string sub_name = 2; } message PublishRequest { diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index 074243abdf6b2b1166c8ec842308f148191a70dc..69c8270afacb9d8d436a502ee7ade10863b5df18 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -31,11 +31,23 @@ class PubSubServiceImpl final : public SubManager::Service { grpc::Status Subscribe(ServerContext* context, const SubscribeRequest* request, ServerWriter* writer) override { int cli_topic = request->topic(); - // ToDo: somebody topic - subscribers_[cli_topic].push_back(writer); - - // ToDo: add extra check or feature code + std::string cli_name = request->sub_name(); Message msg; + + for (auto iter = suber_topic_[cli_name].begin(); iter != suber_topic_[cli_name].end(); iter++) { + if (*iter == cli_topic) { + msg.set_text("this client name already subscribe the topic"); + if (!writer->Write(msg)) { + std::cerr << "Failed to write the initial message" << std::endl; + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); + } + return grpc::Status(grpc::StatusCode::INTERNAL, "this client name already subscribe the topic"); + } + } + + suber_topic_[cli_name].push_back(cli_topic); + suber_writer_[cli_name].push_back(writer); + msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!"); if (!writer->Write(msg)) { std::cerr << "Failed to write the initial message" << std::endl; @@ -49,32 +61,50 @@ class PubSubServiceImpl final : public SubManager::Service { grpc::Status Publish(ServerContext* context, const PublishRequest* request, Message* response) override { - int cli_topic = request->topic(); - std::string cli_data = request->data(); + int cli_topic = request->topic(); + std::string cli_data = request->data(); + int i = 0; + Message msg; + msg.set_text(cli_data); - if (subscribers_.find(cli_topic) != subscribers_.end()) { - for (auto& subscriber : subscribers_[cli_topic]) { - Message msg; - msg.set_text(cli_data); - if (!subscriber->Write(msg)) { - std::cerr << "Failed to write to a subscriber" << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); + for (auto iter = suber_topic_.begin(); iter != suber_topic_.end(); iter++) { + i = 0; + for (auto topic_item : iter->second) { + if (topic_item == cli_topic) { + auto& subscriber = suber_writer_[iter->first][i]; + if (!subscriber->Write(msg)) { + std::cerr << "Failed to write to a subscriber" << std::endl; + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); + } + break; + } + i++; } - } - } - return grpc::Status::OK; + } + + return grpc::Status::OK; } grpc::Status UnSubscribe(ServerContext* context, const UnSubscribeRequest* request, Message* response) override { int cli_topic = request->topic(); + std::string cli_name = request->sub_name(); + int i = 0; - // ToDo: add extra check or feature code + for (auto topic_item : suber_topic_[cli_name]) { + if (topic_item == cli_topic) { + suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); + suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); + break; + } + i++; + } response->set_text("topic: " + std::to_string(cli_topic) + " UnSubscribe success!"); return grpc::Status::OK; } private: - std::unordered_map*>> subscribers_; + std::unordered_map> suber_topic_; + std::unordered_map*>> suber_writer_; }; void RunServer() { diff --git a/observer_agent/grpc_comm/server_demo.cpp b/observer_agent/grpc_comm/server_demo.cpp index 760a52787ab660c9775b57c9400c4126d6adc100..5358620ef806cb6e6fe4601d37ecbf8a61493718 100644 --- a/observer_agent/grpc_comm/server_demo.cpp +++ b/observer_agent/grpc_comm/server_demo.cpp @@ -27,15 +27,27 @@ using data_comm::UnSubscribeRequest; using data_comm::PublishRequest; class PubSubServiceImpl final : public SubManager::Service { - public: - grpc::Status Subscribe(ServerContext* context, const SubscribeRequest* request, + public: + grpc::Status Subscribe(ServerContext* context, const SubscribeRequest* request, ServerWriter* writer) override { int cli_topic = request->topic(); - // ToDo: somebody topic - subscribers_[cli_topic].push_back(writer); - - // ToDo: add extra check or feature code + std::string cli_name = request->sub_name(); Message msg; + + for (auto iter = suber_topic_[cli_name].begin(); iter != suber_topic_[cli_name].end(); iter++) { + if (*iter == cli_topic) { + msg.set_text("this client name already subscribe the topic"); + if (!writer->Write(msg)) { + std::cerr << "Failed to write the initial message" << std::endl; + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); + } + return grpc::Status(grpc::StatusCode::INTERNAL, "this client name already subscribe the topic"); + } + } + + suber_topic_[cli_name].push_back(cli_topic); + suber_writer_[cli_name].push_back(writer); + msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!"); if (!writer->Write(msg)) { std::cerr << "Failed to write the initial message" << std::endl; @@ -47,49 +59,67 @@ class PubSubServiceImpl final : public SubManager::Service { return grpc::Status::OK; } - grpc::Status Publish(ServerContext* context, const PublishRequest* request, + grpc::Status Publish(ServerContext* context, const PublishRequest* request, Message* response) override { - int cli_topic = request->topic(); - std::string cli_data = request->data(); + int cli_topic = request->topic(); + std::string cli_data = request->data(); + int i = 0; + Message msg; + msg.set_text(cli_data); - if (subscribers_.find(cli_topic) != subscribers_.end()) { - for (auto& subscriber : subscribers_[cli_topic]) { - Message msg; - msg.set_text(cli_data); - if (!subscriber->Write(msg)) { - std::cerr << "Failed to write to a subscriber" << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); + for (auto iter = suber_topic_.begin(); iter != suber_topic_.end(); iter++) { + i = 0; + for (auto topic_item : iter->second) { + if (topic_item == cli_topic) { + auto& subscriber = suber_writer_[iter->first][i]; + if (!subscriber->Write(msg)) { + std::cerr << "Failed to write to a subscriber" << std::endl; + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); + } + break; + } + i++; } - } - } - return grpc::Status::OK; + } + + return grpc::Status::OK; } grpc::Status UnSubscribe(ServerContext* context, const UnSubscribeRequest* request, Message* response) override { int cli_topic = request->topic(); + std::string cli_name = request->sub_name(); + int i = 0; - // ToDo: add extra check or feature code + for (auto topic_item : suber_topic_[cli_name]) { + if (topic_item == cli_topic) { + suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); + suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); + break; + } + i++; + } response->set_text("topic: " + std::to_string(cli_topic) + " UnSubscribe success!"); return grpc::Status::OK; } - private: - std::unordered_map*>> subscribers_; +private: + std::unordered_map> suber_topic_; + std::unordered_map*>> suber_writer_; }; void RunServer() { - std::string server_address("unix:///var/run/secDetector.sock"); - PubSubServiceImpl service; + std::string server_address("unix:///var/run/secDetector.sock"); + PubSubServiceImpl service; - ServerBuilder builder; - builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); - builder.RegisterService(&service); + ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); - std::unique_ptr server(builder.BuildAndStart()); - server->Wait(); + std::unique_ptr server(builder.BuildAndStart()); + server->Wait(); } int main(int argc, char** argv) { - RunServer(); - return 0; + RunServer(); + return 0; }