diff --git a/observer_agent/CMakeLists.txt b/observer_agent/CMakeLists.txt index 2510eb28358bd2138466c95541a8f84596966a7a..c80783c77c37a114306dae9e7044175d195d48b6 100644 --- a/observer_agent/CMakeLists.txt +++ b/observer_agent/CMakeLists.txt @@ -2,5 +2,8 @@ cmake_minimum_required(VERSION 3.1) set(CMAKE_CXX_STANDARD 11) project(observer_agent VERSION 1.0 LANGUAGES CXX) +set(GRPC_PATH ${CMAKE_CURRENT_SOURCE_DIR}/grpc_comm) +add_custom_target(grpc_demo ALL + COMMAND make -C ${GRPC_PATH}) add_executable(secDetectord service/main.cpp service/ringbuffer.cpp) -target_include_directories(secDetectord PUBLIC service) \ No newline at end of file +target_include_directories(secDetectord PUBLIC service) diff --git a/observer_agent/grpc_comm/Makefile b/observer_agent/grpc_comm/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..9c2824079646bb6348ab002c0bf54bd2f04faffb --- /dev/null +++ b/observer_agent/grpc_comm/Makefile @@ -0,0 +1,110 @@ +# +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +HOST_SYSTEM = $(shell uname | cut -f 1 -d_) +SYSTEM ?= $(HOST_SYSTEM) +CXX = g++ +CPPFLAGS += `pkg-config --cflags protobuf grpc` +CXXFLAGS += -std=c++11 +ifeq ($(SYSTEM),Darwin) +LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++`\ + -pthread\ + -lgrpc++_reflection\ + -ldl +else +LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++`\ + -pthread\ + -Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed\ + -ldl +endif +PROTOC = protoc +GRPC_CPP_PLUGIN = grpc_cpp_plugin +GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)` + +PROTOS_PATH = ./protos + +vpath %.proto $(PROTOS_PATH) + +all: system-check comm_client comm_server + +comm_client: comm_api.pb.o comm_api.grpc.pb.o comm_client.o + $(CXX) $^ $(LDFLAGS) -o $@ + +comm_server: comm_api.pb.o comm_api.grpc.pb.o comm_server.o + $(CXX) $^ $(LDFLAGS) -o $@ + +%.grpc.pb.cc: %.proto + $(PROTOC) -I $(PROTOS_PATH) --grpc_out=. --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN_PATH) $< + +%.pb.cc: %.proto + $(PROTOC) -I $(PROTOS_PATH) --cpp_out=. $< + +clean: + rm -f *.o *.pb.cc *.pb.h comm_client comm_server + + +# The following is to test your system and ensure a smoother experience. +# They are by no means necessary to actually compile a grpc-enabled software. + +PROTOC_CMD = which $(PROTOC) +PROTOC_CHECK_CMD = $(PROTOC) --version | grep -q libprotoc.3 +PLUGIN_CHECK_CMD = which $(GRPC_CPP_PLUGIN) +HAS_PROTOC = $(shell $(PROTOC_CMD) > /dev/null && echo true || echo false) +ifeq ($(HAS_PROTOC),true) +HAS_VALID_PROTOC = $(shell $(PROTOC_CHECK_CMD) 2> /dev/null && echo true || echo false) +endif +HAS_PLUGIN = $(shell $(PLUGIN_CHECK_CMD) > /dev/null && echo true || echo false) + +SYSTEM_OK = false +ifeq ($(HAS_VALID_PROTOC),true) +ifeq ($(HAS_PLUGIN),true) +SYSTEM_OK = true +endif +endif + +system-check: +ifneq ($(HAS_VALID_PROTOC),true) + @echo " DEPENDENCY ERROR" + @echo + @echo "You don't have protoc 3.0.0 installed in your path." + @echo "Please install Google protocol buffers 3.0.0 and its compiler." + @echo "You can find it here:" + @echo + @echo " https://github.com/protocolbuffers/protobuf/releases/tag/v3.0.0" + @echo + @echo "Here is what I get when trying to evaluate your version of protoc:" + @echo + -$(PROTOC) --version + @echo + @echo +endif +ifneq ($(HAS_PLUGIN),true) + @echo " DEPENDENCY ERROR" + @echo + @echo "You don't have the grpc c++ protobuf plugin installed in your path." + @echo "Please install grpc. You can find it here:" + @echo + @echo " https://github.com/grpc/grpc" + @echo + @echo "Here is what I get when trying to detect if you have the plugin:" + @echo + -which $(GRPC_CPP_PLUGIN) + @echo + @echo +endif +ifneq ($(SYSTEM_OK),true) + @false +endif diff --git a/observer_agent/grpc_comm/comm_client.cpp b/observer_agent/grpc_comm/comm_client.cpp new file mode 100644 index 0000000000000000000000000000000000000000..28d05899131128a2508b63244e0ffeaa9807a184 --- /dev/null +++ b/observer_agent/grpc_comm/comm_client.cpp @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2023 Huawei Technologies Co., Ltd. All rights reserved. + * secDetector is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * + * Author: hurricane618 + * Create: 2023-09-26 + * Description: secDetector grpc client + */ +#include +#include "comm_api.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using data_comm::Message; +using data_comm::SubManager; +using data_comm::SubscribeRequest; +using data_comm::UnSubscribeRequest; +using data_comm::PublishRequest; + +#define BUF_NUM 1024 + +class PubSubClient { + public: + PubSubClient(std::shared_ptr channel) + : stub_(SubManager::NewStub(channel)) {} + + std::unique_ptr> Subscribe(const int topic) { + SubscribeRequest request; + request.set_topic(topic); + + //ClientContext context; + + // fork / copy_thread to read date + //std::unique_ptr> reader( + // stub_->Subscribe(&context, request)); + + Message msg; + SubFlag = true; + + return stub_->Subscribe(&context, request); + } + + void Publish(const int topic, const std::string& content) { + PublishRequest request; + request.set_topic(topic); + request.set_data(content); + ClientContext pub_context; + Message msg; + grpc::Status status = stub_->Publish(&pub_context, request, &msg); + if (!status.ok()) { + std::cerr << "Error: " << status.error_code() << ": " << status.error_message() << std::endl; + } + } + + void UnSubscribe(const int topic) { + UnSubscribeRequest request; + request.set_topic(topic); + + //ClientContext context; + Message msg; + grpc::Status status = stub_->UnSubscribe(&context, request, &msg); + SubFlag = false; + + if (!status.ok()) { + std::cerr << "Error: " << status.error_code() << ": " << status.error_message() << std::endl; + } + + std::cout << "Received: " << msg.text() << std::endl; + + return; + } + + std::string ReadFrom(std::unique_ptr> &reader) { + Message msg; + reader->Read(&msg); + std::cout << "Received: " << msg.text() << std::endl; + return msg.text(); + } + + private: + std::unique_ptr stub_; + bool SubFlag; + ClientContext context; +}; + +int main(int argc, char** argv) { + std::string server_address("unix:///var/run/secDetector.sock"); + PubSubClient client(grpc::CreateChannel( + server_address, grpc::InsecureChannelCredentials())); + + std::unique_ptr> cli_reader = client.Subscribe(1); + std::string some_data = client.ReadFrom(cli_reader); + std::cout << "whz: " << some_data << std::endl; + while (some_data != "end") { + some_data = client.ReadFrom(cli_reader); + std::cout << "loop whz: " << some_data << std::endl; + } + // client.Publish(1, "ahahahah"); + // client.Publish(1, "end"); + //sleep(5); + //client.UnSubscribe(1); + + return 0; +} + diff --git a/observer_agent/grpc_comm/comm_server.cpp b/observer_agent/grpc_comm/comm_server.cpp new file mode 100644 index 0000000000000000000000000000000000000000..10aad3ebc24f9415804e4beb27d5a48bf3af4fb1 --- /dev/null +++ b/observer_agent/grpc_comm/comm_server.cpp @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2023 Huawei Technologies Co., Ltd. All rights reserved. + * secDetector is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * + * Author: hurricane618 + * Create: 2023-09-26 + * Description: secDetector grpc server + */ +#include +#include "comm_api.grpc.pb.h" + +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::ServerWriter; +using data_comm::Message; +using data_comm::SubManager; +using data_comm::SubscribeRequest; +using data_comm::UnSubscribeRequest; +using data_comm::PublishRequest; + +class PubSubServiceImpl final : public SubManager::Service { + public: + grpc::Status Subscribe(ServerContext* context, const SubscribeRequest* request, + ServerWriter* writer) override { + // Here you would normally check the topic and write the appropriate messages. + // For simplicity, we're just writing a single message here. + int cli_topic = request->topic(); + // ToDo: somebody topic + subscribers_[cli_topic].push_back(writer); + + // ToDo: add extra check or feature code + Message msg; + msg.set_text("topic: " + std::to_string(cli_topic) + " Subscribe success!"); + writer->Write(msg); + + // ToDo: set some condition to break loop + while (1) {} + return grpc::Status::OK; + } + + grpc::Status Publish(ServerContext* context, const PublishRequest* request, + Message* response) override { + int cli_topic = request->topic(); + std::string cli_data = request->data(); + + if (subscribers_.find(cli_topic) != subscribers_.end()) { + for (auto& subscriber : subscribers_[cli_topic]) { + Message msg; + msg.set_text(cli_data); + subscriber->Write(msg); + } + } + return grpc::Status::OK; + } + + grpc::Status UnSubscribe(ServerContext* context, const UnSubscribeRequest* request, + Message* response) override { + int cli_topic = request->topic(); + //subscribers_[topic].pop_front(writer); + + // ToDo: add extra check or feature code + response->set_text("topic: " + std::to_string(cli_topic) + " UnSubscribe success!"); + return grpc::Status::OK; + } + private: + std::unordered_map*>> subscribers_; +}; + +void RunServer() { + std::string server_address("unix:///var/run/secDetector.sock"); + PubSubServiceImpl service; + + ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + + std::unique_ptr server(builder.BuildAndStart()); + server->Wait(); +} + +int main(int argc, char** argv) { + RunServer(); + return 0; +} + diff --git a/observer_agent/grpc_comm/grpc_api.h b/observer_agent/grpc_comm/grpc_api.h new file mode 100644 index 0000000000000000000000000000000000000000..dfffebfc3e97b5d62cbd5da40724d5d89e4ba9c7 --- /dev/null +++ b/observer_agent/grpc_comm/grpc_api.h @@ -0,0 +1,48 @@ +#ifndef SECDETECTOR_OBSERVER_AGENT_GRPC_API_H +#define SECDETECTOR_OBSERVER_AGENT_GRPC_API_H + +#include +#include "comm_api.grpc.pb.h" + +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::ServerWriter; +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using data_comm::Message; +using data_comm::SubManager; +using data_comm::SubscribeRequest; +using data_comm::UnSubscribeRequest; +using data_comm::PublishRequest; + +class PubSubServiceImpl final : public SubManager::Service { + public: + grpc::Status Subscribe(ServerContext* context, const SubscribeRequest* request, + ServerWriter* writer); + grpc::Status Publish(ServerContext* context, const PublishRequest* request, + Message* response); + grpc::Status UnSubscribe(ServerContext* context, const UnSubscribeRequest* request, + Message* response); + private: + std::unordered_map*>> subscribers_; +}; + +void RunServer(); + +class PubSubClient { + public: + PubSubClient(std::shared_ptr channel); + std::unique_ptr> Subscribe(const int topic); + void Publish(const int topic, const std::string& content); + void UnSubscribe(const int topic); + std::string ReadFrom(std::unique_ptr> &reader); + + private: + std::unique_ptr stub_; + bool SubFlag; + ClientContext context; +}; + +#endif diff --git a/observer_agent/grpc_comm/protos/comm_api.proto b/observer_agent/grpc_comm/protos/comm_api.proto new file mode 100644 index 0000000000000000000000000000000000000000..37e870ba85395d8003b925a58d5ab07d6330d17a --- /dev/null +++ b/observer_agent/grpc_comm/protos/comm_api.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +package data_comm; + +// The message type +message Message { + string text = 1; +} + +message SubscribeRequest { + int32 topic = 1; +} + +message UnSubscribeRequest { + int32 topic = 1; +} + +message PublishRequest { + int32 topic = 1; + string data = 2; +} +// The service definition +service SubManager { + rpc Subscribe (SubscribeRequest) returns (stream Message) {} + rpc UnSubscribe (UnSubscribeRequest) returns (Message) {} + rpc Publish (PublishRequest) returns (Message) {} +} +