diff --git a/examples/python/client.py b/examples/python/client.py index d6dd7aa6f49b253faed34d709bcf30b7f33f4272..312384d8cc72c1de30eddb0dc79d7f8641b329db 100644 --- a/examples/python/client.py +++ b/examples/python/client.py @@ -36,9 +36,11 @@ secDetectorsdklib.secUnsub.restype = None secDetectorsdklib.secReadFrom.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int] secDetectorsdklib.secReadFrom.restype = None +g_read_flag = True def thread_func_sub_and_read(num=0): global g_cli_reader + global g_read_flag cli_reader = secDetectorsdklib.secSub(1) g_cli_reader_lock.acquire() @@ -50,10 +52,7 @@ def thread_func_sub_and_read(num=0): secDetectorsdklib.secReadFrom(cli_reader, data, data_len) print("client read data:{}".format(data.value.decode())) - while True: - if data.value.decode() == 'end': - print("client received end") - break + while g_read_flag: time.sleep(3) secDetectorsdklib.secReadFrom(cli_reader, data, data_len) print("client while read data:{}".format(data.value.decode())) @@ -62,8 +61,11 @@ def thread_func_sub_and_read(num=0): def thread_func_unsub(num=0): global g_cli_reader + global g_read_flag + g_cli_reader_lock.acquire() try: + g_read_flag = False secDetectorsdklib.secUnsub(1, g_cli_reader) finally: g_cli_reader_lock.release() diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp index ee760794508fb5dd8e12b63bf19ba3d008078b37..847dd2f85f6e0caf3252d847fe0bc9a17331f496 100644 --- a/lib/secDetector_sdk.cpp +++ b/lib/secDetector_sdk.cpp @@ -55,14 +55,14 @@ void secUnsub(const int topic, void *reader) } if (!reader) - return; + return; - g_client.Publish(topic, "end"); g_client.UnSubscribe(topic); Readmap::iterator iter = g_reader_map.find(reader); if (iter != g_reader_map.end()) { g_reader_map.erase(iter); + reader = NULL; } } @@ -70,13 +70,20 @@ void secReadFrom(void *reader, char *data, int data_len) { string msg(""); - if (!reader || !data || data_len <= 1) - return; + if (!data || data_len <= 1) + return + + memset(data, 0, data_len); + + if (!reader) + return; Readmap::iterator iter = g_reader_map.find(reader); - if (iter != g_reader_map.end()) { + if (iter != g_reader_map.end()) { msg = g_client.ReadFrom(iter->second); - } + if (msg == "keepalive") + return; + } strncpy(data, msg.c_str(), data_len - 1); } diff --git a/observer_agent/grpc_comm/server.cpp b/observer_agent/grpc_comm/server.cpp index 54ae66fe1db95ebbfddd18b8a7b3f34f5bd59c0a..3340dfa054fa6c269a85cbeed94d6c1cf065e803 100644 --- a/observer_agent/grpc_comm/server.cpp +++ b/observer_agent/grpc_comm/server.cpp @@ -104,7 +104,6 @@ class PubSubServiceImpl final : public SubManager::Service keepalive_msg.set_text("keepalive"); while (connect_status[tmp_index]) { - sleep(CHECK_TIME); if (!writer->Write(keepalive_msg)) { for (auto topic_item : suber_topic_[cli_name]) @@ -124,6 +123,7 @@ class PubSubServiceImpl final : public SubManager::Service } return grpc::Status(grpc::StatusCode::INTERNAL, "writer is lose!"); } + sleep(CHECK_TIME); } return grpc::Status::OK; }