From 4a58573122e2c677c427cb43bb02e6f055fdf391 Mon Sep 17 00:00:00 2001 From: zgzxx Date: Mon, 11 Dec 2023 20:57:58 +0800 Subject: [PATCH] secUnsub del topic --- examples/python/client.py | 4 +-- lib/secDetector_sdk.cpp | 8 ++---- lib/secDetector_sdk.h | 2 +- observer_agent/grpc_comm/client.cpp | 3 +-- observer_agent/grpc_comm/client_sub_demo.cpp | 2 +- observer_agent/grpc_comm/grpc_api.h | 2 +- .../grpc_comm/protos/comm_api.proto | 3 +-- observer_agent/grpc_comm/server.cpp | 27 ++++++++----------- 8 files changed, 20 insertions(+), 31 deletions(-) diff --git a/examples/python/client.py b/examples/python/client.py index 312384d..3fb95b4 100644 --- a/examples/python/client.py +++ b/examples/python/client.py @@ -31,7 +31,7 @@ g_cli_reader_lock = threading.Lock() secDetectorsdklib.secSub.argtypes = [ctypes.c_int] secDetectorsdklib.secSub.restype = ctypes.c_void_p -secDetectorsdklib.secUnsub.argtypes = [ctypes.c_int, ctypes.c_void_p] +secDetectorsdklib.secUnsub.argtypes = [ctypes.c_void_p] secDetectorsdklib.secUnsub.restype = None secDetectorsdklib.secReadFrom.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int] secDetectorsdklib.secReadFrom.restype = None @@ -66,7 +66,7 @@ def thread_func_unsub(num=0): g_cli_reader_lock.acquire() try: g_read_flag = False - secDetectorsdklib.secUnsub(1, g_cli_reader) + secDetectorsdklib.secUnsub(g_cli_reader) finally: g_cli_reader_lock.release() print("client thread_func_unsub end") diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp index 6f47f41..6b00953 100644 --- a/lib/secDetector_sdk.cpp +++ b/lib/secDetector_sdk.cpp @@ -62,13 +62,9 @@ void *secSub(const int topic) return ret_reader; } -void secUnsub(const int topic, void *reader) +void secUnsub(void *reader) { PubSubClient *cur_client; - if (topic <= 0 || topic > ALLTOPIC) { - printf("lib secUnsub failed, topic:%d is error\n", topic); - return; - } if (!reader) return; @@ -77,7 +73,7 @@ void secUnsub(const int topic, void *reader) Readmap::iterator iter = g_reader_map.find(reader); if (iter != g_reader_map.end()) { cur_client = iter->second.second; - cur_client->UnSubscribe(topic); + cur_client->UnSubscribe(); g_reader_map.erase(iter); reader = NULL; delete cur_client; diff --git a/lib/secDetector_sdk.h b/lib/secDetector_sdk.h index abf112b..92ef5b4 100644 --- a/lib/secDetector_sdk.h +++ b/lib/secDetector_sdk.h @@ -18,7 +18,7 @@ #define SECDETECTOR_SDK_H void *secSub(const int topic); -void secUnsub(const int topic, void *reader); +void secUnsub(void *reader); void secReadFrom(void *reader, char *data, int data_len); #endif diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp index 0dd02f9..5cf8cf2 100644 --- a/observer_agent/grpc_comm/client.cpp +++ b/observer_agent/grpc_comm/client.cpp @@ -87,10 +87,9 @@ void PubSubClient::Publish(const int topic, const std::string &content) } } -void PubSubClient::UnSubscribe(const int topic) +void PubSubClient::UnSubscribe(void) { UnSubscribeRequest request; - request.set_topic(topic); request.set_sub_name(uuid_str); ClientContext unsub_context; diff --git a/observer_agent/grpc_comm/client_sub_demo.cpp b/observer_agent/grpc_comm/client_sub_demo.cpp index fbf27ad..550b503 100644 --- a/observer_agent/grpc_comm/client_sub_demo.cpp +++ b/observer_agent/grpc_comm/client_sub_demo.cpp @@ -34,7 +34,7 @@ int main(int argc, char **argv) some_data = client.ReadFrom(cli_reader); std::cout << "loop whz: " << some_data << std::endl; } - client.UnSubscribe(std::stoi(argv[1])); + client.UnSubscribe(); return 0; } diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h index 27a9139..c5b43cc 100644 --- a/observer_agent/grpc_comm/grpc_api.h +++ b/observer_agent/grpc_comm/grpc_api.h @@ -48,7 +48,7 @@ class PubSubClient void init(std::shared_ptr channel); std::unique_ptr> Subscribe(const int topic); void Publish(const int topic, const std::string &content); - void UnSubscribe(const int topic); + void UnSubscribe(void); std::string ReadFrom(std::unique_ptr> &reader); private: diff --git a/observer_agent/grpc_comm/protos/comm_api.proto b/observer_agent/grpc_comm/protos/comm_api.proto index 6c84865..cf1e445 100644 --- a/observer_agent/grpc_comm/protos/comm_api.proto +++ b/observer_agent/grpc_comm/protos/comm_api.proto @@ -13,8 +13,7 @@ message SubscribeRequest { } message UnSubscribeRequest { - int32 topic = 1; - string sub_name = 2; + string sub_name = 1; } message PublishRequest { diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index b47b1aa..938d09c 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -176,7 +176,6 @@ class PubSubServiceImpl final : public SubManager::Service grpc::Status UnSubscribe(ServerContext *context, const UnSubscribeRequest *request, Message *response) override { - int cli_topic = request->topic(); std::string cli_name = request->sub_name(); int i = 0; int unsub_flag = 0; @@ -189,27 +188,23 @@ class PubSubServiceImpl final : public SubManager::Service std::lock_guard lock(sub_mutex); - for (auto topic_item : suber_topic_[cli_name]) + std::unordered_map>::iterator iter = suber_topic_.find(cli_name); + if (iter != suber_topic_.end()) { - if (topic_item == cli_topic) - { - suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); - suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); - connect_status[suber_connection_[cli_name].at(i)] = false; - suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i); - connection_num--; - unsub_flag = 1; - break; - } - i++; + suber_topic_[cli_name].erase(suber_topic_[cli_name].begin() + i); + suber_writer_[cli_name].erase(suber_writer_[cli_name].begin() + i); + connect_status[suber_connection_[cli_name].at(i)] = false; + suber_connection_[cli_name].erase(suber_connection_[cli_name].begin() + i); + connection_num--; + unsub_flag = 1; } if (!unsub_flag) { - response->set_text("don't exist the topic: " + std::to_string(cli_topic)); - return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe topic!"); + response->set_text("don't exist the reader"); + return grpc::Status(grpc::StatusCode::INTERNAL, "Failed to UnSubscribe reader!"); } - response->set_text("topic: " + std::to_string(cli_topic) + " UnSubscribe success!"); + response->set_text("UnSubscribe success!"); return grpc::Status::OK; } -- Gitee