diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp index ecb54aedaf5e1d72323105bf15fbc4ff05c3c5da..84b5c96ccd38493f9702eb748aac24e4990ae569 100644 --- a/observer_agent/grpc_comm/client.cpp +++ b/observer_agent/grpc_comm/client.cpp @@ -43,16 +43,22 @@ std::unique_ptr> PubSubClient::Subscribe(const int topic) SubscribeRequest request; request.set_topic(topic); request.set_sub_name(uuid_str); + std::string ret_info; Message msg; SubFlag = true; std::unique_ptr> reader = stub_->Subscribe(&context, request); - if (reader == nullptr) + ret_info = ReadFrom(reader); + + if (ret_info.substr(0, 6) == "topic:") + { + std::cout << "Success subscribe." << std::endl; + return reader; + } else { std::cerr << "Failed to subscribe." << std::endl; return nullptr; } - return reader; } void PubSubClient::Publish(const int topic, const std::string &content) diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index 54ae66fe1db95ebbfddd18b8a7b3f34f5bd59c0a..d53866fe0c22303e755f972169fc7a265379f34e 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -86,6 +86,14 @@ class PubSubServiceImpl final : public SubManager::Service return grpc::Status(grpc::StatusCode::INTERNAL, "multi-process max connection number, Failed to Subscribe the topic"); } + msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!"); + if (!writer->Write(msg)) + { + std::cerr << "Failed to write the initial message" << std::endl; + sub_mutex.unlock(); + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); + } + suber_topic_[cli_name].push_back(cli_topic); suber_writer_[cli_name].push_back(writer); suber_connection_[cli_name].push_back(tmp_index); @@ -94,13 +102,6 @@ class PubSubServiceImpl final : public SubManager::Service sub_mutex.unlock(); - msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!"); - if (!writer->Write(msg)) - { - std::cerr << "Failed to write the initial message" << std::endl; - return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to write the message"); - } - keepalive_msg.set_text("keepalive"); while (connect_status[tmp_index]) {