diff --git a/lib/secDetector_sdk.cpp b/lib/secDetector_sdk.cpp index 847dd2f85f6e0caf3252d847fe0bc9a17331f496..4c1266dec4858ae5300dc7663ead3868f0cd030b 100644 --- a/lib/secDetector_sdk.cpp +++ b/lib/secDetector_sdk.cpp @@ -16,15 +16,20 @@ #include #include +#include #include "../observer_agent/grpc_comm/grpc_api.h" #define ALLTOPIC 0x00FFFFFF +#define CONNECT_NUM_MAX 5 using namespace std; static string server_address("unix:///var/run/secDetector.sock"); static PubSubClient g_client(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials())); -using Readmap = map>>; +using Readmap = map>, unsigned int>>; static Readmap g_reader_map; +static unsigned int g_connect_num; +static mutex g_connect_mtx; +static PubSubClient *g_all_client[CONNECT_NUM_MAX]; #ifdef __cplusplus extern "C" { @@ -33,59 +38,79 @@ extern "C" { void *secSub(const int topic) { if (topic <= 0 || topic > ALLTOPIC) { - printf("secSub failed, topic:%d is error\n", topic); + printf("lib secSub failed, topic:%d is error\n", topic); return NULL; } - - unique_ptr> reader = g_client.Subscribe(topic); - - if (!reader) + g_connect_mtx.lock(); + if (g_connect_num >= CONNECT_NUM_MAX) { + printf("lib secSub faild, connect num:%d error\n", g_connect_num); + g_connect_mtx.lock(); return NULL; + } + std::shared_ptr channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()); + g_all_client[g_connect_num] = new(PubSubClient); + g_all_client[g_connect_num].init(channel); + unique_ptr> reader = g_all_client[g_connect_num].Subscribe(topic); + + if (!reader) { + printf("lib secSub failed, get reader null\n"); + delete g_all_client[g_connect_num]; + g_connect_mtx.unlock(); + return NULL; + } void * ret_reader = static_cast(reader.get()); - g_reader_map.insert(Readmap::value_type(ret_reader, move(reader))); + g_reader_map.insert(Readmap::value_type(ret_reader, std::make_pair(move(reader), g_connect_num))); + g_connect_num++; + g_connect_mtx.unlock(); return ret_reader; } void secUnsub(const int topic, void *reader) { + unsigned int current_num; if (topic <= 0 || topic > ALLTOPIC) { - printf("secUnsub failed, topic:%d is error\n", topic); + printf("lib secUnsub failed, topic:%d is error\n", topic); return; } if (!reader) return; - g_client.UnSubscribe(topic); - + g_connect_mtx.lock(); Readmap::iterator iter = g_reader_map.find(reader); if (iter != g_reader_map.end()) { + current_num = iter->second.second; + g_all_client[current_num].UnSubscribe(topic); g_reader_map.erase(iter); reader = NULL; + delete g_all_client[g_connect_num]; + g_connect_num--; } + g_connect_mtx.unlock(); } void secReadFrom(void *reader, char *data, int data_len) { string msg(""); + unsigned int current_num; if (!data || data_len <= 1) return - memset(data, 0, data_len); + (void)memset(data, 0, data_len); if (!reader) return; Readmap::iterator iter = g_reader_map.find(reader); if (iter != g_reader_map.end()) { - msg = g_client.ReadFrom(iter->second); + current_num = iter->second.second; + msg = g_all_client[current_num].ReadFrom(iter->second.first); if (msg == "keepalive") return; + strncpy(data, msg.c_str(), data_len - 1); } - - strncpy(data, msg.c_str(), data_len - 1); } #ifdef __cplusplus diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp index ecb54aedaf5e1d72323105bf15fbc4ff05c3c5da..d4b09480d114e36b6c29f44b451d0c87a4aacea9 100644 --- a/observer_agent/grpc_comm/client.cpp +++ b/observer_agent/grpc_comm/client.cpp @@ -29,6 +29,8 @@ using grpc::ClientReader; #define BUF_NUM 1024 +PubSubClient::PubSubClient() {} + PubSubClient::PubSubClient(std::shared_ptr channel) : stub_(SubManager::NewStub(channel)) { uuid_t uuid; @@ -38,6 +40,16 @@ PubSubClient::PubSubClient(std::shared_ptr channel) : stub_(SubManager: uuid_str = std::string(uuid_temp); } +void PubSubClient::init(std::shared_ptr channel) +{ + uuid_t uuid; + char uuid_temp[37]; + uuid_generate(uuid); + uuid_unparse(uuid, uuid_temp); + uuid_str = std::string(uuid_temp); + stub_ = SubManager::NewStub(channel); +} + std::unique_ptr> PubSubClient::Subscribe(const int topic) { SubscribeRequest request; diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h index 44d4aa985ab466c2ab9e878ba72e0b5c022167e3..4bde109e81c849c1b2dd4193bbbec194b00a420b 100644 --- a/observer_agent/grpc_comm/grpc_api.h +++ b/observer_agent/grpc_comm/grpc_api.h @@ -42,7 +42,9 @@ void RunServer(); class PubSubClient { public: + PubSubClient(); PubSubClient(std::shared_ptr channel); + void init(std::shared_ptr channel); std::unique_ptr> Subscribe(const int topic); void Publish(const int topic, const std::string &content); void UnSubscribe(const int topic);