diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp index 847dd2f85f6e0caf3252d847fe0bc9a17331f496..6f47f4156a8756685e09a41f05b805096170eb22 100644 --- a/lib/secDetector_sdk.cpp +++ b/lib/secDetector_sdk.cpp @@ -16,15 +16,17 @@ #include #include +#include +#include #include "../observer_agent/grpc_comm/grpc_api.h" #define ALLTOPIC 0x00FFFFFF using namespace std; static string server_address("unix:///var/run/secDetector.sock"); -static PubSubClient g_client(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials())); -using Readmap = map>>; +using Readmap = map>, PubSubClient *>>; static Readmap g_reader_map; +static mutex g_connect_mtx; #ifdef __cplusplus extern "C" { @@ -32,60 +34,82 @@ extern "C" { void *secSub(const int topic) { + PubSubClient *cur_client; if (topic <= 0 || topic > ALLTOPIC) { - printf("secSub failed, topic:%d is error\n", topic); + printf("lib secSub failed, topic:%d is error\n", topic); return NULL; } + g_connect_mtx.lock(); + std::shared_ptr channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()); + cur_client = new(PubSubClient); + if (cur_client == nullptr) { + g_connect_mtx.unlock(); + return NULL; + } + cur_client->init(channel); + unique_ptr> reader = cur_client->Subscribe(topic); - unique_ptr> reader = g_client.Subscribe(topic); - - if (!reader) + if (!reader) { + printf("lib secSub failed, get reader null\n"); + delete cur_client; + g_connect_mtx.unlock(); return NULL; + } void * ret_reader = static_cast(reader.get()); - g_reader_map.insert(Readmap::value_type(ret_reader, move(reader))); + g_reader_map.insert(Readmap::value_type(ret_reader, std::make_pair(move(reader), cur_client))); + g_connect_mtx.unlock(); return ret_reader; } void secUnsub(const int topic, void *reader) { + PubSubClient *cur_client; if (topic <= 0 || topic > ALLTOPIC) { - printf("secUnsub failed, topic:%d is error\n", topic); + printf("lib secUnsub failed, topic:%d is error\n", topic); return; } if (!reader) return; - g_client.UnSubscribe(topic); - + g_connect_mtx.lock(); Readmap::iterator iter = g_reader_map.find(reader); if (iter != g_reader_map.end()) { + cur_client = iter->second.second; + cur_client->UnSubscribe(topic); g_reader_map.erase(iter); reader = NULL; + delete cur_client; } + g_connect_mtx.unlock(); } void secReadFrom(void *reader, char *data, int data_len) { string msg(""); + PubSubClient *cur_client; if (!data || data_len <= 1) return - memset(data, 0, data_len); + (void)memset(data, 0, data_len); if (!reader) return; + g_connect_mtx.lock(); Readmap::iterator iter = g_reader_map.find(reader); if (iter != g_reader_map.end()) { - msg = g_client.ReadFrom(iter->second); - if (msg == "keepalive") + cur_client = iter->second.second; + msg = cur_client->ReadFrom(iter->second.first); + if (msg == "keepalive") { + g_connect_mtx.unlock(); return; + } + strncpy(data, msg.c_str(), data_len - 1); } - - strncpy(data, msg.c_str(), data_len - 1); + g_connect_mtx.unlock(); } #ifdef __cplusplus diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp index ecb54aedaf5e1d72323105bf15fbc4ff05c3c5da..d4b09480d114e36b6c29f44b451d0c87a4aacea9 100644 --- a/observer_agent/grpc_comm/client.cpp +++ b/observer_agent/grpc_comm/client.cpp @@ -29,6 +29,8 @@ using grpc::ClientReader; #define BUF_NUM 1024 +PubSubClient::PubSubClient() {} + PubSubClient::PubSubClient(std::shared_ptr channel) : stub_(SubManager::NewStub(channel)) { uuid_t uuid; @@ -38,6 +40,16 @@ PubSubClient::PubSubClient(std::shared_ptr channel) : stub_(SubManager: uuid_str = std::string(uuid_temp); } +void PubSubClient::init(std::shared_ptr channel) +{ + uuid_t uuid; + char uuid_temp[37]; + uuid_generate(uuid); + uuid_unparse(uuid, uuid_temp); + uuid_str = std::string(uuid_temp); + stub_ = SubManager::NewStub(channel); +} + std::unique_ptr> PubSubClient::Subscribe(const int topic) { SubscribeRequest request; diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h index 44d4aa985ab466c2ab9e878ba72e0b5c022167e3..4bde109e81c849c1b2dd4193bbbec194b00a420b 100644 --- a/observer_agent/grpc_comm/grpc_api.h +++ b/observer_agent/grpc_comm/grpc_api.h @@ -42,7 +42,9 @@ void RunServer(); class PubSubClient { public: + PubSubClient(); PubSubClient(std::shared_ptr channel); + void init(std::shared_ptr channel); std::unique_ptr> Subscribe(const int topic); void Publish(const int topic, const std::string &content); void UnSubscribe(const int topic);