From dfb2b2b9c5c73c00a26b5fe3542216cf6eca15ee Mon Sep 17 00:00:00 2001 From: hurricane618 Date: Tue, 17 Oct 2023 20:58:06 +0800 Subject: [PATCH] grpc code move into service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将grpc代码移动到serive中 Signed-off-by: hurricane618 --- observer_agent/CMakeLists.txt | 8 +- observer_agent/grpc_comm/Makefile | 8 +- observer_agent/grpc_comm/client.cpp | 102 +++++++++++++++++++++++ observer_agent/grpc_comm/comm_client.cpp | 4 +- observer_agent/grpc_comm/comm_server.cpp | 8 +- observer_agent/service/main.cpp | 26 +++++- observer_agent/service/ringbuffer.h | 2 +- 7 files changed, 147 insertions(+), 11 deletions(-) create mode 100644 observer_agent/grpc_comm/client.cpp diff --git a/observer_agent/CMakeLists.txt b/observer_agent/CMakeLists.txt index c80783c..2b7d647 100644 --- a/observer_agent/CMakeLists.txt +++ b/observer_agent/CMakeLists.txt @@ -5,5 +5,9 @@ project(observer_agent VERSION 1.0 LANGUAGES CXX) set(GRPC_PATH ${CMAKE_CURRENT_SOURCE_DIR}/grpc_comm) add_custom_target(grpc_demo ALL COMMAND make -C ${GRPC_PATH}) -add_executable(secDetectord service/main.cpp service/ringbuffer.cpp) -target_include_directories(secDetectord PUBLIC service) + +add_executable(secDetectord grpc_comm/client.cpp grpc_comm/comm_server.cpp service/main.cpp service/ringbuffer.cpp grpc_comm/comm_api.pb.o grpc_comm/comm_api.grpc.pb.o) +target_include_directories(secDetectord PUBLIC service grpc_comm) +# target_link_libraries(secDetectord protobuf grpc++ grpc) +set(SDK_LINK_FLAGS " -lprotobuf -lpthread -lgrpc++ -lgrpc -laddress_sorting -lre2 -lupb -lcares -lz -lgpr -lssl -lcrypto -labsl_hash -labsl_city -labsl_low_level_hash -labsl_raw_hash_set -labsl_hashtablez_sampler -labsl_statusor -labsl_status -labsl_cord -labsl_cordz_info -labsl_cord_internal -labsl_cordz_functions -labsl_exponential_biased -labsl_cordz_handle -labsl_bad_optional_access -labsl_strerror -labsl_str_format_internal -labsl_synchronization -labsl_graphcycles_internal -labsl_stacktrace -labsl_symbolize -labsl_debugging_internal -labsl_demangle_internal -labsl_malloc_internal -labsl_time -labsl_civil_time -labsl_strings -labsl_strings_internal -lrt -labsl_base -labsl_spinlock_wait -labsl_int128 -labsl_throw_delegate -labsl_time_zone -labsl_bad_variant_access -labsl_raw_logging_internal -labsl_log_severity") +set_target_properties(secDetectord PROPERTIES LINK_FLAGS "${SDK_LINK_FLAGS}") diff --git a/observer_agent/grpc_comm/Makefile b/observer_agent/grpc_comm/Makefile index 9c28240..2947dd5 100644 --- a/observer_agent/grpc_comm/Makefile +++ b/observer_agent/grpc_comm/Makefile @@ -38,7 +38,13 @@ PROTOS_PATH = ./protos vpath %.proto $(PROTOS_PATH) -all: system-check comm_client comm_server +all: system-check client server comm_client + +client: comm_api.pb.o comm_api.grpc.pb.o comm_client.o + @echo "only compile client don't link" + +server: comm_api.pb.o comm_api.grpc.pb.o comm_server.o + @echo "only compile server don't link" comm_client: comm_api.pb.o comm_api.grpc.pb.o comm_client.o $(CXX) $^ $(LDFLAGS) -o $@ diff --git a/observer_agent/grpc_comm/client.cpp b/observer_agent/grpc_comm/client.cpp new file mode 100644 index 0000000..15d204f --- /dev/null +++ b/observer_agent/grpc_comm/client.cpp @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2023 Huawei Technologies Co., Ltd. All rights reserved. + * secDetector is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * + * Author: hurricane618 + * Create: 2023-09-26 + * Description: secDetector grpc client + */ +#include +#include "comm_api.grpc.pb.h" +#include "grpc_api.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using data_comm::Message; +using data_comm::SubManager; +using data_comm::SubscribeRequest; +using data_comm::UnSubscribeRequest; +using data_comm::PublishRequest; + +#define BUF_NUM 1024 + +PubSubClient::PubSubClient(std::shared_ptr channel) + : stub_(SubManager::NewStub(channel)) {} + +std::unique_ptr> PubSubClient::Subscribe(const int topic) { + SubscribeRequest request; + request.set_topic(topic); + //ClientContext context; + // fork / copy_thread to read date + //std::unique_ptr> reader( + // stub_->Subscribe(&context, request)); + Message msg; + SubFlag = true; + return stub_->Subscribe(&context, request); +} + +void PubSubClient::Publish(const int topic, const std::string& content) { + PublishRequest request; + request.set_topic(topic); + request.set_data(content); + ClientContext pub_context; + Message msg; + grpc::Status status = stub_->Publish(&pub_context, request, &msg); + if (!status.ok()) { + std::cerr << "Error: " << status.error_code() << ": " << status.error_message() << std::endl; + } +} + +void PubSubClient::UnSubscribe(const int topic) { + UnSubscribeRequest request; + request.set_topic(topic); + + //ClientContext context; + Message msg; + grpc::Status status = stub_->UnSubscribe(&context, request, &msg); + SubFlag = false; + + if (!status.ok()) { + std::cerr << "Error: " << status.error_code() << ": " << status.error_message() << std::endl; + } + + std::cout << "Received: " << msg.text() << std::endl; + + return; +} + +std::string PubSubClient::ReadFrom(std::unique_ptr> &reader) { + Message msg; + reader->Read(&msg); + std::cout << "Received: " << msg.text() << std::endl; + return msg.text(); +} + +// int main(int argc, char** argv) { +// std::string server_address("unix:///var/run/secDetector.sock"); +// PubSubClient client(grpc::CreateChannel( +// server_address, grpc::InsecureChannelCredentials())); + +// std::unique_ptr> cli_reader = client.Subscribe(1); +// std::string some_data = client.ReadFrom(cli_reader); +// std::cout << "whz: " << some_data << std::endl; +// while (some_data != "end") { +// some_data = client.ReadFrom(cli_reader); +// std::cout << "loop whz: " << some_data << std::endl; +// } + // client.Publish(1, "ahahahah"); + // client.Publish(1, "end"); + //sleep(5); + //client.UnSubscribe(1); + +// return 0; +// } + diff --git a/observer_agent/grpc_comm/comm_client.cpp b/observer_agent/grpc_comm/comm_client.cpp index 28d0589..b330391 100644 --- a/observer_agent/grpc_comm/comm_client.cpp +++ b/observer_agent/grpc_comm/comm_client.cpp @@ -105,8 +105,8 @@ int main(int argc, char** argv) { } // client.Publish(1, "ahahahah"); // client.Publish(1, "end"); - //sleep(5); - //client.UnSubscribe(1); + // sleep(5); + // client.UnSubscribe(1); return 0; } diff --git a/observer_agent/grpc_comm/comm_server.cpp b/observer_agent/grpc_comm/comm_server.cpp index 3f64fbc..240cc93 100644 --- a/observer_agent/grpc_comm/comm_server.cpp +++ b/observer_agent/grpc_comm/comm_server.cpp @@ -92,8 +92,8 @@ void RunServer() { server->Wait(); } -int main(int argc, char** argv) { - RunServer(); - return 0; -} +// int main(int argc, char** argv) { +// RunServer(); +// return 0; +// } diff --git a/observer_agent/service/main.cpp b/observer_agent/service/main.cpp index be9ba3c..74bb952 100644 --- a/observer_agent/service/main.cpp +++ b/observer_agent/service/main.cpp @@ -14,6 +14,7 @@ * Description: secDetector main entry */ #include "ringbuffer.h" +#include "../grpc_comm/grpc_api.h" #include #include #include @@ -23,6 +24,21 @@ #include #include #include +#include +#include + +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::ServerWriter; +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using data_comm::Message; +using data_comm::SubManager; +using data_comm::SubscribeRequest; +using data_comm::UnSubscribeRequest; +using data_comm::PublishRequest; static volatile bool exiting = false; static void sig_handler(int sig) { exiting = true; } @@ -33,6 +49,11 @@ static int ringbuf_cb(struct response_rb_entry *entry, size_t entry_size) { syslog(LOG_INFO, "type:%d, text:%s\n", entry->type, entry->text); /* TODO: you can add function there */ + std::string server_address("unix:///var/run/secDetector.sock"); + PubSubClient client(grpc::CreateChannel( + server_address, grpc::InsecureChannelCredentials())); + // topic need extra args + client.Publish(1, entry->text); return 0; } @@ -54,10 +75,13 @@ int main() { exit(EXIT_FAILURE); } + std::thread t = std::thread(RunServer); + while (!exiting) { secDetector_ringbuf_poll((poll_cb)ringbuf_cb); } secDetector_ringbuf_detach(); + t.join(); return 0; -} \ No newline at end of file +} diff --git a/observer_agent/service/ringbuffer.h b/observer_agent/service/ringbuffer.h index 46575ca..efb6b9d 100644 --- a/observer_agent/service/ringbuffer.h +++ b/observer_agent/service/ringbuffer.h @@ -26,4 +26,4 @@ struct response_rb_entry { extern int secDetector_ringbuf_attach(void); extern void secDetector_ringbuf_detach(void); extern int secDetector_ringbuf_poll(poll_cb cb); -#endif \ No newline at end of file +#endif -- Gitee