From ed0f040a77b4f7dd39e4375bc740cf96c3ef4e2d Mon Sep 17 00:00:00 2001 From: hurricane618 Date: Tue, 26 Sep 2023 20:25:52 +0800 Subject: [PATCH 1/2] add gRPC demo client and server code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增gRPC框架写的客户端和服务端通信程序,目前使用 unix socket通信。 Signed-off-by: hurricane618 --- grpc/Makefile | 110 +++++++++++++++++++++++++++++++++++++ grpc/comm_client.cc | 64 +++++++++++++++++++++ grpc/comm_server.cc | 61 ++++++++++++++++++++ grpc/protos/comm_api.proto | 17 ++++++ 4 files changed, 252 insertions(+) create mode 100644 grpc/Makefile create mode 100644 grpc/comm_client.cc create mode 100644 grpc/comm_server.cc create mode 100644 grpc/protos/comm_api.proto diff --git a/grpc/Makefile b/grpc/Makefile new file mode 100644 index 0000000..9c28240 --- /dev/null +++ b/grpc/Makefile @@ -0,0 +1,110 @@ +# +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +HOST_SYSTEM = $(shell uname | cut -f 1 -d_) +SYSTEM ?= $(HOST_SYSTEM) +CXX = g++ +CPPFLAGS += `pkg-config --cflags protobuf grpc` +CXXFLAGS += -std=c++11 +ifeq ($(SYSTEM),Darwin) +LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++`\ + -pthread\ + -lgrpc++_reflection\ + -ldl +else +LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++`\ + -pthread\ + -Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed\ + -ldl +endif +PROTOC = protoc +GRPC_CPP_PLUGIN = grpc_cpp_plugin +GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)` + +PROTOS_PATH = ./protos + +vpath %.proto $(PROTOS_PATH) + +all: system-check comm_client comm_server + +comm_client: comm_api.pb.o comm_api.grpc.pb.o comm_client.o + $(CXX) $^ $(LDFLAGS) -o $@ + +comm_server: comm_api.pb.o comm_api.grpc.pb.o comm_server.o + $(CXX) $^ $(LDFLAGS) -o $@ + +%.grpc.pb.cc: %.proto + $(PROTOC) -I $(PROTOS_PATH) --grpc_out=. --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN_PATH) $< + +%.pb.cc: %.proto + $(PROTOC) -I $(PROTOS_PATH) --cpp_out=. $< + +clean: + rm -f *.o *.pb.cc *.pb.h comm_client comm_server + + +# The following is to test your system and ensure a smoother experience. +# They are by no means necessary to actually compile a grpc-enabled software. + +PROTOC_CMD = which $(PROTOC) +PROTOC_CHECK_CMD = $(PROTOC) --version | grep -q libprotoc.3 +PLUGIN_CHECK_CMD = which $(GRPC_CPP_PLUGIN) +HAS_PROTOC = $(shell $(PROTOC_CMD) > /dev/null && echo true || echo false) +ifeq ($(HAS_PROTOC),true) +HAS_VALID_PROTOC = $(shell $(PROTOC_CHECK_CMD) 2> /dev/null && echo true || echo false) +endif +HAS_PLUGIN = $(shell $(PLUGIN_CHECK_CMD) > /dev/null && echo true || echo false) + +SYSTEM_OK = false +ifeq ($(HAS_VALID_PROTOC),true) +ifeq ($(HAS_PLUGIN),true) +SYSTEM_OK = true +endif +endif + +system-check: +ifneq ($(HAS_VALID_PROTOC),true) + @echo " DEPENDENCY ERROR" + @echo + @echo "You don't have protoc 3.0.0 installed in your path." + @echo "Please install Google protocol buffers 3.0.0 and its compiler." + @echo "You can find it here:" + @echo + @echo " https://github.com/protocolbuffers/protobuf/releases/tag/v3.0.0" + @echo + @echo "Here is what I get when trying to evaluate your version of protoc:" + @echo + -$(PROTOC) --version + @echo + @echo +endif +ifneq ($(HAS_PLUGIN),true) + @echo " DEPENDENCY ERROR" + @echo + @echo "You don't have the grpc c++ protobuf plugin installed in your path." + @echo "Please install grpc. You can find it here:" + @echo + @echo " https://github.com/grpc/grpc" + @echo + @echo "Here is what I get when trying to detect if you have the plugin:" + @echo + -which $(GRPC_CPP_PLUGIN) + @echo + @echo +endif +ifneq ($(SYSTEM_OK),true) + @false +endif diff --git a/grpc/comm_client.cc b/grpc/comm_client.cc new file mode 100644 index 0000000..285234f --- /dev/null +++ b/grpc/comm_client.cc @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2023 Huawei Technologies Co., Ltd. All rights reserved. + * secDetector is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * + * Author: hurricane618 + * Create: 2023-09-26 + * Description: secDetector grpc client + */ +#include +#include "comm_api.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using pubsub::Message; +using pubsub::PubSub; +using pubsub::SubscribeRequest; + +class PubSubClient { + public: + PubSubClient(std::shared_ptr channel) + : stub_(PubSub::NewStub(channel)) {} + + void Subscribe(const std::string& topic) { + SubscribeRequest request; + request.set_topic(topic); + + ClientContext context; + + std::unique_ptr> reader( + stub_->Subscribe(&context, request)); + + Message msg; + while (reader->Read(&msg)) { + std::cout << "Received: " << msg.text() << std::endl; + } + + grpc::Status status = reader->Finish(); + if (!status.ok()) { + std::cout << "Subscribe failed: " << status.error_message() << std::endl; + } +} + + private: + std::unique_ptr stub_; +}; + +int main(int argc, char** argv) { + std::string server_address("unix:///var/run/secDetector.sock"); + PubSubClient client(grpc::CreateChannel( + server_address, grpc::InsecureChannelCredentials())); + + client.Subscribe("chkmal"); + + return 0; +} + diff --git a/grpc/comm_server.cc b/grpc/comm_server.cc new file mode 100644 index 0000000..8414e4b --- /dev/null +++ b/grpc/comm_server.cc @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2023 Huawei Technologies Co., Ltd. All rights reserved. + * secDetector is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * + * Author: hurricane618 + * Create: 2023-09-26 + * Description: secDetector grpc server + */ +#include +#include "comm_api.grpc.pb.h" + +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::ServerWriter; +using pubsub::Message; +using pubsub::PubSub; +using pubsub::SubscribeRequest; + +class PubSubServiceImpl final : public PubSub::Service { + public: + grpc::Status Subscribe(ServerContext* context, const SubscribeRequest* request, + ServerWriter* writer) override { + // Here you would normally check the topic and write the appropriate messages. + // For simplicity, we're just writing a single message here. + std::string cli_topic = request->topic(); + Message msg; + if (cli_topic == "chkmal") { + msg.set_text("our probe data"); + } else { + msg.set_text("Hello, world!"); + } + writer->Write(msg); + return grpc::Status::OK; + } +}; + +void RunServer() { + std::string server_address("unix:///var/run/secDetector.sock"); + PubSubServiceImpl service; + + ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + + std::unique_ptr server(builder.BuildAndStart()); + server->Wait(); +} + +int main(int argc, char** argv) { + RunServer(); + return 0; +} + diff --git a/grpc/protos/comm_api.proto b/grpc/protos/comm_api.proto new file mode 100644 index 0000000..119955c --- /dev/null +++ b/grpc/protos/comm_api.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package pubsub; + +// The message type +message Message { + string text = 1; +} + +// The service definition +service PubSub { + rpc Subscribe (SubscribeRequest) returns (stream Message) {} +} + +message SubscribeRequest { + string topic = 1; +} -- Gitee From 96c0d28bf71a84e252358bb61ee1d26004819f75 Mon Sep 17 00:00:00 2001 From: hurricane618 Date: Thu, 12 Oct 2023 21:52:06 +0800 Subject: [PATCH 2/2] grpc demo update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 更新grpc的demo相关代码 Signed-off-by: hurricane618 --- observer_agent/CMakeLists.txt | 5 +- {grpc => observer_agent/grpc_comm}/Makefile | 0 .../grpc_comm/comm_client.cpp | 79 +++++++++++++++---- .../grpc_comm/comm_server.cpp | 52 +++++++++--- observer_agent/grpc_comm/grpc_api.h | 48 +++++++++++ .../grpc_comm}/protos/comm_api.proto | 21 +++-- 6 files changed, 174 insertions(+), 31 deletions(-) rename {grpc => observer_agent/grpc_comm}/Makefile (100%) rename grpc/comm_client.cc => observer_agent/grpc_comm/comm_client.cpp (36%) rename grpc/comm_server.cc => observer_agent/grpc_comm/comm_server.cpp (52%) create mode 100644 observer_agent/grpc_comm/grpc_api.h rename {grpc => observer_agent/grpc_comm}/protos/comm_api.proto (41%) diff --git a/observer_agent/CMakeLists.txt b/observer_agent/CMakeLists.txt index 2510eb2..c80783c 100644 --- a/observer_agent/CMakeLists.txt +++ b/observer_agent/CMakeLists.txt @@ -2,5 +2,8 @@ cmake_minimum_required(VERSION 3.1) set(CMAKE_CXX_STANDARD 11) project(observer_agent VERSION 1.0 LANGUAGES CXX) +set(GRPC_PATH ${CMAKE_CURRENT_SOURCE_DIR}/grpc_comm) +add_custom_target(grpc_demo ALL + COMMAND make -C ${GRPC_PATH}) add_executable(secDetectord service/main.cpp service/ringbuffer.cpp) -target_include_directories(secDetectord PUBLIC service) \ No newline at end of file +target_include_directories(secDetectord PUBLIC service) diff --git a/grpc/Makefile b/observer_agent/grpc_comm/Makefile similarity index 100% rename from grpc/Makefile rename to observer_agent/grpc_comm/Makefile diff --git a/grpc/comm_client.cc b/observer_agent/grpc_comm/comm_client.cpp similarity index 36% rename from grpc/comm_client.cc rename to observer_agent/grpc_comm/comm_client.cpp index 285234f..28d0589 100644 --- a/grpc/comm_client.cc +++ b/observer_agent/grpc_comm/comm_client.cpp @@ -19,37 +19,76 @@ using grpc::Channel; using grpc::ClientContext; using grpc::ClientReader; -using pubsub::Message; -using pubsub::PubSub; -using pubsub::SubscribeRequest; +using data_comm::Message; +using data_comm::SubManager; +using data_comm::SubscribeRequest; +using data_comm::UnSubscribeRequest; +using data_comm::PublishRequest; + +#define BUF_NUM 1024 class PubSubClient { public: PubSubClient(std::shared_ptr channel) - : stub_(PubSub::NewStub(channel)) {} + : stub_(SubManager::NewStub(channel)) {} - void Subscribe(const std::string& topic) { + std::unique_ptr> Subscribe(const int topic) { SubscribeRequest request; request.set_topic(topic); - ClientContext context; + //ClientContext context; - std::unique_ptr> reader( - stub_->Subscribe(&context, request)); + // fork / copy_thread to read date + //std::unique_ptr> reader( + // stub_->Subscribe(&context, request)); Message msg; - while (reader->Read(&msg)) { - std::cout << "Received: " << msg.text() << std::endl; + SubFlag = true; + + return stub_->Subscribe(&context, request); + } + + void Publish(const int topic, const std::string& content) { + PublishRequest request; + request.set_topic(topic); + request.set_data(content); + ClientContext pub_context; + Message msg; + grpc::Status status = stub_->Publish(&pub_context, request, &msg); + if (!status.ok()) { + std::cerr << "Error: " << status.error_code() << ": " << status.error_message() << std::endl; } + } + + void UnSubscribe(const int topic) { + UnSubscribeRequest request; + request.set_topic(topic); + + //ClientContext context; + Message msg; + grpc::Status status = stub_->UnSubscribe(&context, request, &msg); + SubFlag = false; - grpc::Status status = reader->Finish(); if (!status.ok()) { - std::cout << "Subscribe failed: " << status.error_message() << std::endl; + std::cerr << "Error: " << status.error_code() << ": " << status.error_message() << std::endl; } -} + + std::cout << "Received: " << msg.text() << std::endl; + + return; + } + + std::string ReadFrom(std::unique_ptr> &reader) { + Message msg; + reader->Read(&msg); + std::cout << "Received: " << msg.text() << std::endl; + return msg.text(); + } private: - std::unique_ptr stub_; + std::unique_ptr stub_; + bool SubFlag; + ClientContext context; }; int main(int argc, char** argv) { @@ -57,7 +96,17 @@ int main(int argc, char** argv) { PubSubClient client(grpc::CreateChannel( server_address, grpc::InsecureChannelCredentials())); - client.Subscribe("chkmal"); + std::unique_ptr> cli_reader = client.Subscribe(1); + std::string some_data = client.ReadFrom(cli_reader); + std::cout << "whz: " << some_data << std::endl; + while (some_data != "end") { + some_data = client.ReadFrom(cli_reader); + std::cout << "loop whz: " << some_data << std::endl; + } + // client.Publish(1, "ahahahah"); + // client.Publish(1, "end"); + //sleep(5); + //client.UnSubscribe(1); return 0; } diff --git a/grpc/comm_server.cc b/observer_agent/grpc_comm/comm_server.cpp similarity index 52% rename from grpc/comm_server.cc rename to observer_agent/grpc_comm/comm_server.cpp index 8414e4b..10aad3e 100644 --- a/grpc/comm_server.cc +++ b/observer_agent/grpc_comm/comm_server.cpp @@ -20,26 +20,58 @@ using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::ServerWriter; -using pubsub::Message; -using pubsub::PubSub; -using pubsub::SubscribeRequest; +using data_comm::Message; +using data_comm::SubManager; +using data_comm::SubscribeRequest; +using data_comm::UnSubscribeRequest; +using data_comm::PublishRequest; -class PubSubServiceImpl final : public PubSub::Service { +class PubSubServiceImpl final : public SubManager::Service { public: grpc::Status Subscribe(ServerContext* context, const SubscribeRequest* request, ServerWriter* writer) override { // Here you would normally check the topic and write the appropriate messages. // For simplicity, we're just writing a single message here. - std::string cli_topic = request->topic(); + int cli_topic = request->topic(); + // ToDo: somebody topic + subscribers_[cli_topic].push_back(writer); + + // ToDo: add extra check or feature code Message msg; - if (cli_topic == "chkmal") { - msg.set_text("our probe data"); - } else { - msg.set_text("Hello, world!"); - } + msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!"); writer->Write(msg); + + // ToDo: set some condition to break loop + while (1) {} + return grpc::Status::OK; + } + + grpc::Status Publish(ServerContext* context, const PublishRequest* request, + Message* response) override { + int cli_topic = request->topic(); + std::string cli_data = request->data(); + + if (subscribers_.find(cli_topic) != subscribers_.end()) { + for (auto& subscriber : subscribers_[cli_topic]) { + Message msg; + msg.set_text(cli_data); + subscriber->Write(msg); + } + } + return grpc::Status::OK; + } + + grpc::Status UnSubscribe(ServerContext* context, const UnSubscribeRequest* request, + Message* response) override { + int cli_topic = request->topic(); + //subscribers_[topic].pop_front(writer); + + // ToDo: add extra check or feature code + response->set_text("topic: " + std::to_string(cli_topic) + " UnSubscribe success!"); return grpc::Status::OK; } + private: + std::unordered_map*>> subscribers_; }; void RunServer() { diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h new file mode 100644 index 0000000..dfffebf --- /dev/null +++ b/observer_agent/grpc_comm/grpc_api.h @@ -0,0 +1,48 @@ +#ifndef SECDETECTOR_OBSERVER_AGENT_GRPC_API_H +#define SECDETECTOR_OBSERVER_AGENT_GRPC_API_H + +#include +#include "comm_api.grpc.pb.h" + +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::ServerWriter; +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using data_comm::Message; +using data_comm::SubManager; +using data_comm::SubscribeRequest; +using data_comm::UnSubscribeRequest; +using data_comm::PublishRequest; + +class PubSubServiceImpl final : public SubManager::Service { + public: + grpc::Status Subscribe(ServerContext* context, const SubscribeRequest* request, + ServerWriter* writer); + grpc::Status Publish(ServerContext* context, const PublishRequest* request, + Message* response); + grpc::Status UnSubscribe(ServerContext* context, const UnSubscribeRequest* request, + Message* response); + private: + std::unordered_map*>> subscribers_; +}; + +void RunServer(); + +class PubSubClient { + public: + PubSubClient(std::shared_ptr channel); + std::unique_ptr> Subscribe(const int topic); + void Publish(const int topic, const std::string& content); + void UnSubscribe(const int topic); + std::string ReadFrom(std::unique_ptr> &reader); + + private: + std::unique_ptr stub_; + bool SubFlag; + ClientContext context; +}; + +#endif diff --git a/grpc/protos/comm_api.proto b/observer_agent/grpc_comm/protos/comm_api.proto similarity index 41% rename from grpc/protos/comm_api.proto rename to observer_agent/grpc_comm/protos/comm_api.proto index 119955c..37e870b 100644 --- a/grpc/protos/comm_api.proto +++ b/observer_agent/grpc_comm/protos/comm_api.proto @@ -1,17 +1,28 @@ syntax = "proto3"; -package pubsub; +package data_comm; // The message type message Message { string text = 1; } +message SubscribeRequest { + int32 topic = 1; +} + +message UnSubscribeRequest { + int32 topic = 1; +} + +message PublishRequest { + int32 topic = 1; + string data = 2; +} // The service definition -service PubSub { +service SubManager { rpc Subscribe (SubscribeRequest) returns (stream Message) {} + rpc UnSubscribe (UnSubscribeRequest) returns (Message) {} + rpc Publish (PublishRequest) returns (Message) {} } -message SubscribeRequest { - string topic = 1; -} -- Gitee