From d9a4c1cf011ab3d26b88229b5072ebbf6017893a Mon Sep 17 00:00:00 2001 From: zgzxx Date: Fri, 1 Dec 2023 17:36:11 +0800 Subject: [PATCH] lib modify for unsub --- examples/python/client.py | 10 ++++++---- lib/secDetector_sdk.cpp | 19 +++++++++++++------ observer_agent/grpc_comm/server.cpp | 2 +- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/examples/python/client.py b/examples/python/client.py index d6dd7aa..312384d 100644 --- a/examples/python/client.py +++ b/examples/python/client.py @@ -36,9 +36,11 @@ secDetectorsdklib.secUnsub.restype = None secDetectorsdklib.secReadFrom.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int] secDetectorsdklib.secReadFrom.restype = None +g_read_flag = True def thread_func_sub_and_read(num=0): global g_cli_reader + global g_read_flag cli_reader = secDetectorsdklib.secSub(1) g_cli_reader_lock.acquire() @@ -50,10 +52,7 @@ def thread_func_sub_and_read(num=0): secDetectorsdklib.secReadFrom(cli_reader, data, data_len) print("client read data:{}".format(data.value.decode())) - while True: - if data.value.decode() == 'end': - print("client received end") - break + while g_read_flag: time.sleep(3) secDetectorsdklib.secReadFrom(cli_reader, data, data_len) print("client while read data:{}".format(data.value.decode())) @@ -62,8 +61,11 @@ def thread_func_sub_and_read(num=0): def thread_func_unsub(num=0): global g_cli_reader + global g_read_flag + g_cli_reader_lock.acquire() try: + g_read_flag = False secDetectorsdklib.secUnsub(1, g_cli_reader) finally: g_cli_reader_lock.release() diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp index ee76079..847dd2f 100644 --- a/lib/secDetector_sdk.cpp +++ b/lib/secDetector_sdk.cpp @@ -55,14 +55,14 @@ void secUnsub(const int topic, void *reader) } if (!reader) - return; + return; - g_client.Publish(topic, "end"); g_client.UnSubscribe(topic); Readmap::iterator iter = g_reader_map.find(reader); if (iter != g_reader_map.end()) { g_reader_map.erase(iter); + reader = NULL; } } @@ -70,13 +70,20 @@ void secReadFrom(void *reader, char *data, int data_len) { string msg(""); - if (!reader || !data || data_len <= 1) - return; + if (!data || data_len <= 1) + return + + memset(data, 0, data_len); + + if (!reader) + return; Readmap::iterator iter = g_reader_map.find(reader); - if (iter != g_reader_map.end()) { + if (iter != g_reader_map.end()) { msg = g_client.ReadFrom(iter->second); - } + if (msg == "keepalive") + return; + } strncpy(data, msg.c_str(), data_len - 1); } diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index 54ae66f..3340dfa 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -104,7 +104,6 @@ class PubSubServiceImpl final : public SubManager::Service keepalive_msg.set_text("keepalive"); while (connect_status[tmp_index]) { - sleep(CHECK_TIME); if (!writer->Write(keepalive_msg)) { for (auto topic_item : suber_topic_[cli_name]) @@ -124,6 +123,7 @@ class PubSubServiceImpl final : public SubManager::Service } return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!"); } + sleep(CHECK_TIME); } return grpc::Status::OK; } -- Gitee