diff --git a/frameworks/native/include/seq_packet_socket_server.h b/frameworks/native/include/seq_packet_socket_server.h index c214d6a1243bc2c9c9d7be3ee0c506874eab268a..57fb9a5e122d5199e07c5e8114db4baf76b6825e 100644 --- a/frameworks/native/include/seq_packet_socket_server.h +++ b/frameworks/native/include/seq_packet_socket_server.h @@ -18,18 +18,21 @@ #include "socket_server.h" +#include + namespace OHOS { namespace HiviewDFX { -typedef int (*AcceptingHandler)(std::unique_ptr); class SeqPacketSocketServer : public SocketServer { public: + using AcceptingHandler = std::function)>; + SeqPacketSocketServer(const std::string& serverPath, unsigned int maxListenNumber) : SocketServer(serverPath, SOCK_SEQPACKET), maxListenNumber(maxListenNumber) {} ~SeqPacketSocketServer() = default; - int AcceptConnection(AcceptingHandler func); + int StartAcceptingConnection(AcceptingHandler onAccepted); private: unsigned int maxListenNumber; - int AcceptingThread(AcceptingHandler func); + int AcceptingLoop(AcceptingHandler func); }; } // namespace HiviewDFX } // namespace OHOS diff --git a/frameworks/native/include/socket_server.h b/frameworks/native/include/socket_server.h index 2b98a34fbe64fa676d1c13dad59c2dfa3d65968c..f668198c8637949150a8af5e3173385429d07f4f 100644 --- a/frameworks/native/include/socket_server.h +++ b/frameworks/native/include/socket_server.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "hilog_common.h" #include "socket_client.h" @@ -35,6 +36,7 @@ public: int Recv(void *buffer, unsigned int bufferLen, int flags = MSG_PEEK); int RecvMsg(struct msghdr *hdr, int flags = 0); int Listen(unsigned int backlog); + int Poll(short inEvent, short& outEvent, const std::chrono::milliseconds& timeout); int Accept(); private: int socketHandler; diff --git a/frameworks/native/seq_packet_socket_server.cpp b/frameworks/native/seq_packet_socket_server.cpp index c8665b22dbde125a79be1c7beeb0bc68a748523e..8b2ae34bf87a6600314c2fe561e0f337c8fc7bad 100644 --- a/frameworks/native/seq_packet_socket_server.cpp +++ b/frameworks/native/seq_packet_socket_server.cpp @@ -15,36 +15,41 @@ #include "seq_packet_socket_server.h" -#include +#include #include +#include namespace OHOS { namespace HiviewDFX { -int SeqPacketSocketServer::AcceptConnection(AcceptingHandler func) +int SeqPacketSocketServer::StartAcceptingConnection(AcceptingHandler onAccepted) { - int ret = Listen(maxListenNumber); - if (ret < 0) { + int listeningStatus = Listen(maxListenNumber); + if (listeningStatus < 0) { #ifdef DEBUG - std::cout << "Socket listen failed: " << ret << std::endl; + std::cerr << "Socket listen failed: " << listeningStatus << "\n"; + std::cerr << strerror(listeningStatus) << "\n"; #endif - return ret; + return listeningStatus; } - AcceptingThread(func); - - return ret; + return AcceptingLoop(onAccepted); } -int SeqPacketSocketServer::AcceptingThread(AcceptingHandler func) +int SeqPacketSocketServer::AcceptingLoop(AcceptingHandler func) { - int ret = 0; - while ((ret = Accept()) > 0) { + int acceptedSockedFd = 0; + while ((acceptedSockedFd = Accept()) > 0) { std::unique_ptr handler = std::make_unique(SOCK_SEQPACKET); - handler->setHandler(ret); + handler->setHandler(acceptedSockedFd); func(std::move(handler)); } + int acceptError = errno; +#ifdef DEBUG + std::cerr << "Socket accept failed: " << acceptError << "\n"; + std::cerr < #include #include +#include #include #include "socket_server_adapter.h" @@ -89,6 +90,14 @@ int SocketServer::Listen(unsigned int backlog) return listen(socketHandler, backlog); } +int SocketServer::Poll(short inEvent, short& outEvent, const std::chrono::milliseconds& timeout) +{ + pollfd info {socketHandler, inEvent, outEvent}; + int result = poll(&info, 1, timeout.count()); + outEvent = info.revents; + return result; +} + int SocketServer::Accept() { socklen_t addressSize = sizeof(serverAddr); diff --git a/services/hilogd/cmd_executor.cpp b/services/hilogd/cmd_executor.cpp index 86946fa825b9f5604506e370fa4c6548d90e6e61..946cec3d637eb281562122a785a13cd40e464a71 100644 --- a/services/hilogd/cmd_executor.cpp +++ b/services/hilogd/cmd_executor.cpp @@ -14,59 +14,148 @@ */ #include "cmd_executor.h" #include "log_querier.h" -#include "seq_packet_socket_server.h" +#include + +#include +#include +#include #include #include -#include +#include #include -#include + +#include #include +#include +#include namespace OHOS { namespace HiviewDFX { -const int MAX_WRITE_LOG_TASK = 100; +static const int MAX_CLIENT_CONNECTIONS = 100; -using namespace std; -HilogBuffer* CmdExecutor::hilogBuffer = nullptr; +CmdExecutor::CmdExecutor(HilogBuffer* buffer) +{ + m_hilogBuffer = buffer; +} -void LogQuerierMonitor(std::unique_ptr handler) +CmdExecutor::~CmdExecutor() { - prctl(PR_SET_NAME, "hilogd.query"); - std::shared_ptr logQuerier = std::make_shared(std::move(handler), - CmdExecutor::getHilogBuffer()); - logQuerier->LogQuerierThreadFunc(logQuerier); + std::lock_guard lg(m_clientAccess); + for (auto& client : m_clients) { + client->m_stopThread.store(true); + } + for (auto& client : m_clients) { + if (client->m_clientThread.joinable()) { + client->m_clientThread.join(); + } + } } -int CmdExecutorThreadFunc(std::unique_ptr handler) +void CmdExecutor::MainLoop() { - std::thread logQuerierMonitorThread(LogQuerierMonitor, std::move(handler)); - logQuerierMonitorThread.detach(); - return 0; + SeqPacketSocketServer cmdServer(CONTROL_SOCKET_NAME, MAX_CLIENT_CONNECTIONS); + if (cmdServer.Init() < 0) { + std::cerr << "Failed to init control socket ! \n"; + return; + } + std::cout << "Begin to cmd accept !\n"; + int listeningStatus = cmdServer.Listen(MAX_CLIENT_CONNECTIONS); + if (listeningStatus < 0) { + std::cerr << "Socket listen failed: " << listeningStatus << "\n"; + std::cerr << strerror(listeningStatus) << "\n"; + return; + } + std::cout << "Server started to listen !\n"; + + using namespace std::chrono_literals; + for (;;) { + const auto maxtime = 3000ms; + short outEvent = 0; + auto pollResult = cmdServer.Poll(POLLIN, outEvent, maxtime); + if (pollResult == 0) { // poll == 0 means timeout + CleanFinishedClients(); + continue; + } else if (pollResult < 0) { + int pollError = errno; + std::cerr << "Socket polling error: " << pollError << "\n"; + std::cerr << strerror(pollError) << "\n"; + break; + } else if (pollResult != 1 || outEvent != POLLIN) { + std::cerr << "Wrong poll result data." + " Result: " << pollResult << + " OutEvent: " << outEvent << "\n"; + break; + } + + int acceptResult = cmdServer.Accept(); + if (acceptResult > 0) { + int acceptedSockedFd = acceptResult; + std::unique_ptr handler = std::make_unique(SOCK_SEQPACKET); + handler->setHandler(acceptedSockedFd); + OnAcceptedConnection(std::move(handler)); + } else { + int acceptError = errno; + std::cerr << "Socket accept failed: " << acceptError << "\n"; + std::cerr << strerror(acceptError) << "\n"; + break; + } + } } -CmdExecutor::CmdExecutor(HilogBuffer* buffer) +void CmdExecutor::OnAcceptedConnection(std::unique_ptr handler) { - hilogBuffer = buffer; + std::lock_guard lg(m_clientAccess); + auto newVal = std::make_unique(); + newVal->m_stopThread.store(false); + newVal->m_clientThread = std::thread(&CmdExecutor::ClientEventLoop, this, std::move(handler)); + m_clients.push_back(std::move(newVal)); } -void CmdExecutor::StartCmdExecutorThread() +void CmdExecutor::ClientEventLoop(std::unique_ptr handler) { - SeqPacketSocketServer cmdExecutorMainSocket(CONTROL_SOCKET_NAME, MAX_WRITE_LOG_TASK); - if (cmdExecutorMainSocket.Init() < 0) { - cout << "Failed to init control socket ! \n"; - } else { - if (chmod(CONTROL_SOCKET, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH) < 0) { - cout << "chmod control socket failed !\n"; - } - cout << "Begin to cmd accept !\n"; - cmdExecutorMainSocket.AcceptConnection(CmdExecutorThreadFunc); + decltype(m_clients)::iterator clientInfoIt; + { + std::lock_guard lg(m_clientAccess); + clientInfoIt = std::find_if(m_clients.begin(), m_clients.end(), + [](const std::unique_ptr& ct) { + return ct->m_clientThread.get_id() == std::this_thread::get_id(); + }); } + assert(clientInfoIt != m_clients.end()); + + prctl(PR_SET_NAME, "hilogd.query"); + auto logQuerier = std::make_shared(std::move(handler), m_hilogBuffer); + logQuerier->LogQuerierThreadFunc(logQuerier); + + std::lock_guard ul(m_finishedClientAccess); + m_finishedClients.push_back(std::this_thread::get_id()); } -HilogBuffer* CmdExecutor::getHilogBuffer() +void CmdExecutor::CleanFinishedClients() { - return hilogBuffer; + std::list threadsToJoin; + { + // select clients to clean up - pick threads that we have to be sure are ended + std::scoped_lock sl(m_finishedClientAccess, m_clientAccess); + for (auto threadId : m_finishedClients) { + auto clientInfoIt = std::find_if(m_clients.begin(), m_clients.end(), + [&threadId](const std::unique_ptr& ct) { + return ct->m_clientThread.get_id() == threadId; + }); + if (clientInfoIt != m_clients.end()) { + threadsToJoin.push_back(std::move((*clientInfoIt)->m_clientThread)); + m_clients.erase(clientInfoIt); + } + } + m_finishedClients.clear(); + } + for (auto& thread : threadsToJoin) { + if (thread.joinable()) { + thread.join(); + } + } } + } // namespace HiviewDFX } // namespace OHOS diff --git a/services/hilogd/include/cmd_executor.h b/services/hilogd/include/cmd_executor.h index 8a3573aeacbc2849918166e4bfe60cde6d6d5678..fc67ee378c79f5efd9ca3f5a1e404a5d5ead8ab2 100644 --- a/services/hilogd/include/cmd_executor.h +++ b/services/hilogd/include/cmd_executor.h @@ -17,21 +17,34 @@ #include #include +#include +#include +#include #include "log_buffer.h" -#include "log_querier.h" -#include "log_persister.h" -#include "seq_packet_socket_server.h" + namespace OHOS { namespace HiviewDFX { +struct ClientThread { + std::thread m_clientThread; + std::atomic m_stopThread; +}; + class CmdExecutor { public: CmdExecutor(HilogBuffer* buffer); - void StartCmdExecutorThread(); - static HilogBuffer* getHilogBuffer(); - ~CmdExecutor() = default; + ~CmdExecutor(); + void MainLoop(); private: - static HilogBuffer* hilogBuffer; + void OnAcceptedConnection(std::unique_ptr handler); + void ClientEventLoop(std::unique_ptr handler); + void CleanFinishedClients(); + + HilogBuffer* m_hilogBuffer = nullptr; + std::list> m_clients; + std::mutex m_clientAccess; + std::vector m_finishedClients; + std::mutex m_finishedClientAccess; }; } // namespace HiviewDFX } // namespace OHOS diff --git a/services/hilogd/main.cpp b/services/hilogd/main.cpp index e95b4dda84a3620b25c06b2123e4245ba14900ef..4c1e7cfd2555b6357404848cf21f7e0242d33d5e 100644 --- a/services/hilogd/main.cpp +++ b/services/hilogd/main.cpp @@ -105,7 +105,7 @@ int HilogdEntry(int argc, char* argv[]) startupCheckThread.detach(); CmdExecutor cmdExecutor(&hilogBuffer); - cmdExecutor.StartCmdExecutorThread(); + cmdExecutor.MainLoop(); return 0; }