diff --git a/README.md b/README.md index b94da2a58700b753c7b2264ba6c8ec4a86e4d928..1e457ee9d4ddd50cfc535369e52d6bf2f8bf444c 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,8 @@ apt -y install g++ make libgtest libevent-dev libev-dev libgtest-dev - base,基础库,含日志打印、常用工具等; - util,工具库,在业务代码中可能会用到的库; -- event,事件库,对 libevent2, libev 库进行了统一的封装,实现Fd,Timer,Signal三种事件驱动; -- eventx,事件扩展库,含 ThreadPool 线程池模块,专用于处理阻塞性事务; +- event,事件库,实现Fd,Timer,Signal三种事件驱动; +- eventx,事件扩展库,含 ThreadPool 线程池模块,专用于处理阻塞性事务;TimerPool 定时器池模块; - network,网络库,实现了串口、终端、UDP、TCP 通信模块; - coroutine,协程库,众所周知,异步框架不方便处理顺序性业务,协程弥补之; - mqtt,MQTT客户端库; @@ -29,10 +29,10 @@ apt -y install g++ make libgtest libevent-dev libev-dev libgtest-dev - base --> None - util --> base - event --> base, [libevent\_core, libev] -- eventx --> base, event, [pthread] +- eventx --> base, event, - network --> base, event - coroutine --> base, event -- mqtt --> base, event, [mosquitto] +- mqtt --> base, event, - terminal --> base, util, event, network - main --> base, util, event, eventx - sample --> main @@ -40,7 +40,6 @@ apt -y install g++ make libgtest libevent-dev libev-dev libgtest-dev #### 未来规化 - 创建 http 模块,实现 Http 相关的 Server 与 Client 端; -- 在 event 中支持 buildin 事件驱动(进行中 feature-epoll); - 在 network 中支持 TLS; - 实现异步日志输出模块; - 实现使用 CMake 进行工程管理; diff --git a/event/Makefile b/event/Makefile index 0eced0e928f39ce50ea357037af86ec2d007cce7..9bc22659cc626cb28d4a3dfffa66f7936653ddd1 100644 --- a/event/Makefile +++ b/event/Makefile @@ -1,7 +1,6 @@ include config.mk LIB_NAME = event -LIB_NAME_EXT = - LIB_VERSION_X = 1 LIB_VERSION_Y = 1 LIB_VERSION_Z = 0 @@ -12,7 +11,12 @@ HEAD_FILES = \ stat.h version.h CPP_SRC_FILES = \ - loop.cpp common_loop.cpp \ + loop.cpp \ + common_loop.cpp \ + common_loop_signal.cpp \ + common_loop_run.cpp \ + signal_event_impl.cpp \ + misc.cpp \ version.cpp ifeq ($(ENABLE_LIBEVENT), yes) @@ -20,30 +24,24 @@ CPP_SRC_FILES += \ engins/libevent/loop.cpp \ engins/libevent/fd_event.cpp \ engins/libevent/timer_event.cpp \ - engins/libevent/signal_event.cpp \ engins/libevent/common.cpp CXXFLAGS += -DENABLE_LIBEVENT=1 -LIB_NAME_EXT := $(LIB_NAME_EXT)n endif ifeq ($(ENABLE_LIBEV), yes) CPP_SRC_FILES += \ engins/libev/loop.cpp \ engins/libev/fd_event.cpp \ - engins/libev/signal_event.cpp \ engins/libev/timer_event.cpp CXXFLAGS += -DENABLE_LIBEV=1 -LIB_NAME_EXT := $(LIB_NAME_EXT)e endif ifeq ($(ENABLE_EPOLL), yes) CPP_SRC_FILES += \ engins/epoll/loop.cpp \ engins/epoll/fd_event.cpp \ - engins/epoll/signal_event.cpp \ engins/epoll/timer_event.cpp CXXFLAGS += -DENABLE_EPOLL=1 -LIB_NAME_EXT := endif TEST_CPP_SRC_FILES = \ diff --git a/event/common_loop.cpp b/event/common_loop.cpp index 99324b372e5397b85a8010fc4c860334132102d0..6645025705570d4b324b2e7c52c724e6095445d9 100644 --- a/event/common_loop.cpp +++ b/event/common_loop.cpp @@ -2,33 +2,24 @@ #include #include -#include -#include #include +#include #include +#include #include "fd_event.h" #include "stat.h" +#include "misc.h" namespace tbox { namespace event { using namespace std::chrono; -CommonLoop::CommonLoop() : - has_unhandle_req_(false), - read_fd_(-1), write_fd_(-1), - sp_read_event_(nullptr), - cb_level_(0) -{ } - CommonLoop::~CommonLoop() { assert(cb_level_ == 0); - - std::lock_guard g(lock_); - cleanupDeferredTasks(); } bool CommonLoop::isInLoopThread() @@ -40,19 +31,14 @@ bool CommonLoop::isInLoopThread() bool CommonLoop::isRunning() const { std::lock_guard g(lock_); - return sp_read_event_ != nullptr; + return sp_run_read_event_ != nullptr; } void CommonLoop::runThisBeforeLoop() { - int fds[2] = { 0 }; - if (pipe2(fds, O_CLOEXEC | O_NONBLOCK) != 0) { //!FIXME - LogErr("pip2() fail, ret:%d", errno); + int read_fd = -1, write_fd = -1; + if (!CreateFdPair(read_fd, write_fd)) return; - } - - int read_fd(fds[0]); - int write_fd(fds[1]); FdEvent *sp_read_event = newFdEvent(); if (!sp_read_event->initialize(read_fd, FdEvent::kReadEvent, Event::Mode::kPersist)) { @@ -63,17 +49,17 @@ void CommonLoop::runThisBeforeLoop() } using std::placeholders::_1; - sp_read_event->setCallback(std::bind(&CommonLoop::onGotRunInLoopFunc, this, _1)); + sp_read_event->setCallback(std::bind(&CommonLoop::handleRunInLoopRequest, this, _1)); sp_read_event->enable(); std::lock_guard g(lock_); loop_thread_id_ = std::this_thread::get_id(); - read_fd_ = read_fd; - write_fd_ = write_fd; - sp_read_event_ = sp_read_event; + run_read_fd_ = read_fd; + run_write_fd_ = write_fd; + sp_run_read_event_ = sp_read_event; if (!run_in_loop_func_queue_.empty()) - commitRequest(); + commitRunRequest(); #ifdef ENABLE_STAT resetStat(); @@ -86,46 +72,13 @@ void CommonLoop::runThisAfterLoop() cleanupDeferredTasks(); loop_thread_id_ = std::thread::id(); //! 清空 loop_thread_id_ - if (sp_read_event_ != nullptr) { - delete sp_read_event_; - close(write_fd_); - close(read_fd_); - - sp_read_event_ = nullptr; - write_fd_ = -1; - read_fd_ = -1; + if (sp_run_read_event_ != nullptr) { + CHECK_DELETE_RESET_OBJ(sp_run_read_event_); + CHECK_CLOSE_RESET_FD(run_write_fd_); + CHECK_CLOSE_RESET_FD(run_read_fd_); } } -void CommonLoop::runInLoop(const Func &func) -{ - std::lock_guard g(lock_); - run_in_loop_func_queue_.push_back(func); - - if (sp_read_event_ == nullptr) - return; - - commitRequest(); -} - -void CommonLoop::runNext(const Func &func) -{ - if (!isInLoopThread()) { - LogWarn("Fail, use runInLoop() instead."); - return; - } - - run_next_func_queue_.push_back(func); -} - -void CommonLoop::run(const Func &func) -{ - if (isInLoopThread()) - run_next_func_queue_.push_back(func); - else - runInLoop(func); -} - void CommonLoop::beginEventProcess() { #ifdef ENABLE_STAT @@ -149,94 +102,6 @@ void CommonLoop::endEventProcess() #endif } -void CommonLoop::handleNextFunc() -{ - while (!run_next_func_queue_.empty()) { - Func &func = run_next_func_queue_.front(); - if (func) { - ++cb_level_; - func(); - --cb_level_; - } - run_next_func_queue_.pop_front(); - } -} - -void CommonLoop::onGotRunInLoopFunc(short) -{ - /** - * NOTICE: - * 这里使用 tmp 将 run_in_loop_func_queue_ 中的内容交换出去。然后再从 tmp 逐一取任务出来执行。 - * 其目的在于腾空 run_in_loop_func_queue_,让新 runInLoop() 的任务则会在下一轮循环中执行。 - * 从而防止无限 runInLoop() 引起的死循环,导致其它事件得不到处理。 - * - * 这点与 runNext() 不同 - */ - std::deque tmp; - { - std::lock_guard g(lock_); - run_in_loop_func_queue_.swap(tmp); - finishRequest(); - } - - while (!tmp.empty()) { - Func &func = tmp.front(); - if (func) { - ++cb_level_; - func(); - --cb_level_; - - handleNextFunc(); - } - tmp.pop_front(); - } -} - -//! 清理 run_in_loop_func_queue_ 与 run_next_func_queue_ 中的任务 -void CommonLoop::cleanupDeferredTasks() -{ - int remain_loop_count = 10; //! 限定次数,防止出现 runNext() 递归导致无法退出循环的问题 - while ((!run_in_loop_func_queue_.empty() || !run_next_func_queue_.empty()) - && remain_loop_count-- > 0) { - std::deque tasks = std::move(run_next_func_queue_); - tasks.insert(tasks.end(), run_in_loop_func_queue_.begin(), run_in_loop_func_queue_.end()); - run_in_loop_func_queue_.clear(); - - while (!tasks.empty()) { - Func &func = tasks.front(); - if (func) { - ++cb_level_; - func(); - --cb_level_; - } - tasks.pop_front(); - } - } - - if (remain_loop_count == 0) - LogWarn("found recursive actions, force quit"); -} - -void CommonLoop::commitRequest() -{ - if (!has_unhandle_req_) { - char ch = 0; - ssize_t wsize = write(write_fd_, &ch, 1); - (void)wsize; - - has_unhandle_req_ = true; - } -} - -void CommonLoop::finishRequest() -{ - char ch = 0; - ssize_t rsize = read(read_fd_, &ch, 1); - (void)rsize; - - has_unhandle_req_ = false; -} - void CommonLoop::setStatEnable(bool enable) { #ifdef ENABLE_STAT diff --git a/event/common_loop.h b/event/common_loop.h index b80064fb39933326b99a1ec5035241263e04d633..c29982acbac608b9754a4a83c3b58607bdc6e94f 100644 --- a/event/common_loop.h +++ b/event/common_loop.h @@ -4,8 +4,11 @@ #include #include #include +#include +#include #include "loop.h" +#include "signal_event_impl.h" #ifdef ENABLE_STAT #include @@ -16,7 +19,6 @@ namespace event { class CommonLoop : public Loop { public: - CommonLoop(); ~CommonLoop() override; public: @@ -36,28 +38,30 @@ class CommonLoop : public Loop { void beginEventProcess(); void endEventProcess(); + //! 信号处理相关 + SignalEvent* newSignalEvent() override; + bool subscribeSignal(int signal_num, SignalSubscribuer *who); + bool unsubscribeSignal(int signal_num, SignalSubscribuer *who); + static void HandleSignal(int signo); + void onSignal(); + protected: void runThisBeforeLoop(); void runThisAfterLoop(); - void onGotRunInLoopFunc(short); - + void handleRunInLoopRequest(short); void cleanupDeferredTasks(); - - void commitRequest(); - void finishRequest(); - + void commitRunRequest(); + void finishRunRequest(); void handleNextFunc(); private: mutable std::recursive_mutex lock_; - std::thread::id loop_thread_id_; - - bool has_unhandle_req_ = false; - int read_fd_ = -1, write_fd_ = -1; - FdEvent *sp_read_event_ = nullptr; - + bool has_commit_run_req_ = false; + int run_read_fd_ = -1; + int run_write_fd_ = -1; + FdEvent *sp_run_read_event_ = nullptr; std::deque run_in_loop_func_queue_; std::deque run_next_func_queue_; @@ -70,6 +74,14 @@ class CommonLoop : public Loop { uint32_t max_cost_us_ = 0; #endif //ENABLE_STAT + static std::map> _signal_write_fds_; //! 通知 Loop 的 fd,每个 Loop 注册一个 + static std::mutex _signal_lock_; //! 保护 _signal_write_fds_ 用 + + int signal_read_fd_ = -1; + int signal_write_fd_ = -1; + FdEvent *sp_signal_read_event_ = nullptr; + std::map> all_signals_subscribers_; //! signo -> SignalSubscribuer*,信号的订阅者 + int cb_level_ = 0; }; diff --git a/event/common_loop_run.cpp b/event/common_loop_run.cpp new file mode 100644 index 0000000000000000000000000000000000000000..92d72f21813261b5da20f5a180ec7901ad8b6a55 --- /dev/null +++ b/event/common_loop_run.cpp @@ -0,0 +1,127 @@ +#include "common_loop.h" + +#include +#include + +namespace tbox { +namespace event { + +void CommonLoop::runInLoop(const Func &func) +{ + std::lock_guard g(lock_); + run_in_loop_func_queue_.push_back(func); + + if (sp_run_read_event_ == nullptr) + return; + + commitRunRequest(); +} + +void CommonLoop::runNext(const Func &func) +{ + if (!isInLoopThread()) { + LogWarn("Fail, use runInLoop() instead."); + return; + } + + run_next_func_queue_.push_back(func); +} + +void CommonLoop::run(const Func &func) +{ + if (isInLoopThread()) + run_next_func_queue_.push_back(func); + else + runInLoop(func); +} + +void CommonLoop::handleNextFunc() +{ + while (!run_next_func_queue_.empty()) { + Func &func = run_next_func_queue_.front(); + if (func) { + ++cb_level_; + func(); + --cb_level_; + } + run_next_func_queue_.pop_front(); + } +} + +void CommonLoop::handleRunInLoopRequest(short) +{ + /** + * NOTICE: + * 这里使用 tmp 将 run_in_loop_func_queue_ 中的内容交换出去。然后再从 tmp 逐一取任务出来执行。 + * 其目的在于腾空 run_in_loop_func_queue_,让新 runInLoop() 的任务则会在下一轮循环中执行。 + * 从而防止无限 runInLoop() 引起的死循环,导致其它事件得不到处理。 + * + * 这点与 runNext() 不同 + */ + std::deque tmp; + { + std::lock_guard g(lock_); + run_in_loop_func_queue_.swap(tmp); + finishRunRequest(); + } + + while (!tmp.empty()) { + Func &func = tmp.front(); + if (func) { + ++cb_level_; + func(); + --cb_level_; + + handleNextFunc(); + } + tmp.pop_front(); + } +} + +//! 清理 run_in_loop_func_queue_ 与 run_next_func_queue_ 中的任务 +void CommonLoop::cleanupDeferredTasks() +{ + int remain_loop_count = 10; //! 限定次数,防止出现 runNext() 递归导致无法退出循环的问题 + while ((!run_in_loop_func_queue_.empty() || !run_next_func_queue_.empty()) + && remain_loop_count-- > 0) { + std::deque tasks = std::move(run_next_func_queue_); + tasks.insert(tasks.end(), run_in_loop_func_queue_.begin(), run_in_loop_func_queue_.end()); + run_in_loop_func_queue_.clear(); + + while (!tasks.empty()) { + Func &func = tasks.front(); + if (func) { + ++cb_level_; + func(); + --cb_level_; + } + tasks.pop_front(); + } + } + + if (remain_loop_count == 0) + LogWarn("found recursive actions, force quit"); +} + +void CommonLoop::commitRunRequest() +{ + if (!has_commit_run_req_) { + char ch = 0; + ssize_t wsize = write(run_write_fd_, &ch, 1); + (void)wsize; + + has_commit_run_req_ = true; + } +} + +void CommonLoop::finishRunRequest() +{ + char ch = 0; + ssize_t rsize = read(run_read_fd_, &ch, 1); + (void)rsize; + + has_commit_run_req_ = false; +} + +} +} diff --git a/event/common_loop_signal.cpp b/event/common_loop_signal.cpp new file mode 100644 index 0000000000000000000000000000000000000000..64f01f3beac2d605ff3372c8df15214cd751969c --- /dev/null +++ b/event/common_loop_signal.cpp @@ -0,0 +1,141 @@ +#include "common_loop.h" + +#include +#include +#include +#include + +#include "misc.h" +#include "fd_event.h" + +namespace tbox { +namespace event { + +std::map> CommonLoop::_signal_write_fds_; +std::mutex CommonLoop::_signal_lock_; + +bool CommonLoop::subscribeSignal(int signo, SignalSubscribuer *who) +{ + if (signal_read_fd_ == -1) { //! 如果还没有创建对应的信号 + if (!CreateFdPair(signal_read_fd_, signal_write_fd_)) + return false; + + sp_signal_read_event_ = newFdEvent(); + sp_signal_read_event_->initialize(signal_read_fd_, FdEvent::kReadEvent, Event::Mode::kPersist); + sp_signal_read_event_->setCallback(std::bind(&CommonLoop::onSignal, this)); + sp_signal_read_event_->enable(); + } + + auto &this_signal_subscribers = all_signals_subscribers_[signo]; + if (this_signal_subscribers.empty()) { + //! 如果本Loop没有监听该信号,则要去 _signal_write_fds_ 中订阅 + std::unique_lock _g(_signal_lock_); + + //! 要禁止信号触发 + sigset_t new_sigmask, old_sigmask; + sigfillset(&new_sigmask); + sigprocmask(SIG_BLOCK, &new_sigmask, &old_sigmask); + + auto & signo_fds = _signal_write_fds_[signo]; + if (signo_fds.empty()) { + signal(signo, CommonLoop::HandleSignal); + //LogTrace("set signal:%d", signo); + } + signo_fds.insert(signal_write_fd_); + + //! 恢复信号 + sigprocmask(SIG_SETMASK, &old_sigmask, 0); + } + this_signal_subscribers.insert(who); + + return true; +} + +bool CommonLoop::unsubscribeSignal(int signo, SignalSubscribuer *who) +{ + auto &this_signal_subscribers = all_signals_subscribers_[signo]; + this_signal_subscribers.erase(who); //! 将订阅信息删除 + if (!this_signal_subscribers.empty()) //! 检查本Loop中是否已经没有SignalSubscribuer订阅该信号了 + return true; //! 如果还有,就到此为止 + + //! 如果本Loop已经没有SignalSubscribuer订阅该信号了 + all_signals_subscribers_.erase(signo); //! 则将该信号的订阅记录表删除 + { + std::unique_lock _g(_signal_lock_); + + //! 要禁止信号触发 + sigset_t new_sigmask, old_sigmask; + sigfillset(&new_sigmask); + sigprocmask(SIG_BLOCK, &new_sigmask, &old_sigmask); + + //! 并将 _signal_write_fds_ 中的记录删除 + auto &this_signal_fds = _signal_write_fds_[signo]; + this_signal_fds.erase(signal_write_fd_); + if (this_signal_fds.empty()) { + //! 并还原信号处理函数 + signal(signo, SIG_DFL); + //LogTrace("unset signal:%d", signo); + _signal_write_fds_.erase(signo); + } + + //! 恢复信号 + sigprocmask(SIG_SETMASK, &old_sigmask, 0); + } + + if (!all_signals_subscribers_.empty()) + return true; + + //! 已经没有任何SignalSubscribuer订阅任何信号了 + sp_signal_read_event_->disable(); + CHECK_CLOSE_RESET_FD(signal_write_fd_); + CHECK_CLOSE_RESET_FD(signal_read_fd_); + + FdEvent *tobe_delete = nullptr; + std::swap(tobe_delete, sp_signal_read_event_); + run([tobe_delete] { delete tobe_delete; }); + + return true; +} + +//! 信号处理函数 +void CommonLoop::HandleSignal(int signo) +{ + //LogTrace("got signal :%d", signo); + auto &this_signal_fds = _signal_write_fds_[signo]; + for (int fd : this_signal_fds) + write(fd, &signo, sizeof(signo)); +} + +void CommonLoop::onSignal() +{ + while (signal_read_fd_ != -1) { + int signo_array[10]; //! 一次性读10个 + auto rsize = read(signal_read_fd_, &signo_array, sizeof(signo_array)); + if (rsize > 0) { + const auto num = rsize / sizeof(int); + //LogTrace("rsize:%d, num:%u", rsize, num); + for (size_t i = 0; i < num; ++i) { + int signo = signo_array[i]; + //LogTrace("signo:%d", signo); + auto iter = all_signals_subscribers_.find(signo); + if (iter != all_signals_subscribers_.end()) { + for (auto s : iter->second) { + s->onSignal(signo); + } + } + } + } else { + if (errno != EAGAIN) + LogWarn("read error, rsize:%d, errno:%d, %s", rsize, errno, strerror(errno)); + break; + } + } +} + +SignalEvent* CommonLoop::newSignalEvent() +{ + return new SignalEventImpl(this); +} + +} +} diff --git a/event/engins/epoll/fd_event.cpp b/event/engins/epoll/fd_event.cpp index b38e58a32ac45a15acc509e8ac931c928866943f..f341b4074c09aaf1b4b7b602a06d48ec6dedbeca 100644 --- a/event/engins/epoll/fd_event.cpp +++ b/event/engins/epoll/fd_event.cpp @@ -5,6 +5,7 @@ #include "fd_event.h" #include "loop.h" +#include namespace tbox { namespace event { diff --git a/event/engins/epoll/fd_event.h b/event/engins/epoll/fd_event.h index e45fc516c1c36e641b2d77cce94b1c3c6f0a9948..ffada8015d91f515ae8725e185c820541c316424 100644 --- a/event/engins/epoll/fd_event.h +++ b/event/engins/epoll/fd_event.h @@ -8,8 +8,8 @@ namespace tbox { namespace event { -class EpollLoop; -class EpollFdSharedData; +class EpollLoop; +struct EpollFdSharedData; class EpollFdEvent : public FdEvent { public: diff --git a/event/engins/epoll/loop.cpp b/event/engins/epoll/loop.cpp index 3bfcefc8b6a1ad151732e0ca1a2d3a138c9da575..03fbf8dfae7433b977f92a630c719a00446242e8 100644 --- a/event/engins/epoll/loop.cpp +++ b/event/engins/epoll/loop.cpp @@ -34,6 +34,8 @@ EpollLoop::EpollLoop() : EpollLoop::~EpollLoop() { + cleanupDeferredTasks(); + CHECK_CLOSE_RESET_FD(epoll_fd_); CHECK_DELETE_RESET_OBJ(sp_exit_timer_); } @@ -218,11 +220,5 @@ TimerEvent* EpollLoop::newTimerEvent() return new EpollTimerEvent(this); } -SignalEvent* EpollLoop::newSignalEvent() -{ - LogWarn("EpollSignalEvent is not stable in multithread"); - return new EpollSignalEvent(this); -} - } } diff --git a/event/engins/epoll/loop.h b/event/engins/epoll/loop.h index 5b5d14693fb2720efa09ec3cad83362fa2d28f16..c62f1e6a26a7236cf85c83bd765cae55ce4e4a1d 100644 --- a/event/engins/epoll/loop.h +++ b/event/engins/epoll/loop.h @@ -28,7 +28,6 @@ class EpollLoop : public CommonLoop { virtual FdEvent* newFdEvent(); virtual TimerEvent* newTimerEvent(); - virtual SignalEvent* newSignalEvent(); public: inline int epollFd() const { return epoll_fd_; } diff --git a/event/engins/epoll/signal_event.cpp b/event/engins/epoll/signal_event.cpp deleted file mode 100644 index beb80fc5741dcf1ef3e5c976da47eb66381c856f..0000000000000000000000000000000000000000 --- a/event/engins/epoll/signal_event.cpp +++ /dev/null @@ -1,131 +0,0 @@ -#include -#include -#include -#include -#include - -#include "signal_event.h" - -#include -#include -#include "loop.h" - -namespace tbox { -namespace event { - -EpollSignalEvent::EpollSignalEvent(EpollLoop *wp_loop) : - wp_loop_(wp_loop), - signal_fd_event_(wp_loop) -{ - sigemptyset(&sig_mask_); - signal_fd_ = signalfd(-1, &sig_mask_, SFD_NONBLOCK | SFD_CLOEXEC); - assert(signal_fd_ >= 0); -} - -EpollSignalEvent::~EpollSignalEvent() -{ - assert(cb_level_ == 0); - disable(); - - CHECK_CLOSE_RESET_FD(signal_fd_); -} - -bool EpollSignalEvent::initialize(int signum, Mode mode) -{ - disable(); - - if (!signal_fd_event_.initialize(signal_fd_, FdEvent::kReadEvent, mode)) - return false; - - signal_fd_event_.setCallback(std::bind(&EpollSignalEvent::onEvent, this, std::placeholders::_1)); - - sigaddset(&sig_mask_, signum); - - if (mode == Mode::kOneshot) - is_stop_after_trigger_ = true; - - is_inited_ = true; - return true; -} - -void EpollSignalEvent::setCallback(const CallbackFunc &cb) -{ - cb_ = cb; -} - -bool EpollSignalEvent::isEnabled() const -{ - if (!is_inited_) - return false; - - return signal_fd_event_.isEnabled(); -} - -bool EpollSignalEvent::enable() -{ - if (!is_inited_) - return false; - - if (isEnabled()) - return true; - - if (!signal_fd_event_.enable()) - return false; - - signalfd(signal_fd_, &sig_mask_, 0); - sigprocmask(SIG_BLOCK, &sig_mask_, 0); - - return true; -} - -bool EpollSignalEvent::disable() -{ - if (!is_inited_) - return false; - - if (!isEnabled()) - return true; - - sigprocmask(SIG_UNBLOCK, &sig_mask_, 0); - signalfd(signal_fd_, &sig_mask_, 0); - - if (!signal_fd_event_.disable()) - return false; - - return true; -} - -Loop* EpollSignalEvent::getLoop() const -{ - return wp_loop_; -} - -void EpollSignalEvent::onEvent(short events) -{ - wp_loop_->beginEventProcess(); - - if (!(events & FdEvent::kReadEvent)) - return; - - /// We need read the signal_fd_ if got some signals - struct signalfd_siginfo info; - if (read(signal_fd_, &info, sizeof(info)) != sizeof(info)) - return; - - /// signal number = info.ssi_signo - /// pid = info.ssi_pid - /// uid = info.ssi_uid - - if (cb_) { - ++cb_level_; - cb_(); - --cb_level_; - - if (is_stop_after_trigger_) - disable(); - } - - wp_loop_->endEventProcess(); -} -} -} diff --git a/event/engins/epoll/signal_event.h b/event/engins/epoll/signal_event.h deleted file mode 100644 index 61d8a1fa6adc850d289bb56d167a608d9a169fbc..0000000000000000000000000000000000000000 --- a/event/engins/epoll/signal_event.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef TBOX_EVENT_EPOLL_SINGAL_EVENT_H_20220110 -#define TBOX_EVENT_EPOLL_SINGAL_EVENT_H_20220110 - -#include -#include "../../signal_event.h" -#include "fd_event.h" - -struct epoll_event; - -namespace tbox { -namespace event { - -class EpollLoop; -class EpollFdEvent; - -class EpollSignalEvent : public SignalEvent { - public: - explicit EpollSignalEvent(EpollLoop *wp_loop); - virtual ~EpollSignalEvent(); - - public: - virtual bool initialize(int signum, Mode mode); - virtual void setCallback(const CallbackFunc &cb); - - virtual bool isEnabled() const; - virtual bool enable(); - virtual bool disable(); - - virtual Loop* getLoop() const; - - protected: - void onEvent(short events); - - private: - EpollLoop *wp_loop_{ nullptr }; - - bool is_inited_{ false }; - bool is_stop_after_trigger_{ false }; - CallbackFunc cb_{ nullptr }; - - int signal_fd_ = -1; - sigset_t sig_mask_; - EpollFdEvent signal_fd_event_; - - int cb_level_ = 0; -}; - -} -} - -#endif //TBOX_EVENT_EPOLL_SINGAL_EVENT_H_20220110 - diff --git a/event/engins/libev/loop.cpp b/event/engins/libev/loop.cpp index 37718c2d5d6672c39d601702bddd192fa8df8e59..f7a6e63f6e26e39a466504dcf309a15f4143b753 100644 --- a/event/engins/libev/loop.cpp +++ b/event/engins/libev/loop.cpp @@ -4,7 +4,6 @@ #include "fd_event.h" #include "timer_event.h" -#include "signal_event.h" namespace tbox { namespace event { @@ -16,6 +15,8 @@ LibevLoop::LibevLoop() : LibevLoop::~LibevLoop() { + cleanupDeferredTasks(); + delete sp_exit_timer_; ev_loop_destroy(sp_ev_loop_); } @@ -59,10 +60,5 @@ TimerEvent* LibevLoop::newTimerEvent() return new LibevTimerEvent(this); } -SignalEvent* LibevLoop::newSignalEvent() -{ - return new LibevSignalEvent(this); -} - } } diff --git a/event/engins/libev/loop.h b/event/engins/libev/loop.h index 09c891925cbc9585684ab7fd9bada25a77448a5e..9087bd2a10d368d7f8042d8ad5eb64640dbfaa77 100644 --- a/event/engins/libev/loop.h +++ b/event/engins/libev/loop.h @@ -19,7 +19,6 @@ class LibevLoop : public CommonLoop { virtual FdEvent* newFdEvent(); virtual TimerEvent* newTimerEvent(); - virtual SignalEvent* newSignalEvent(); public: struct ev_loop* getEvLoopPtr() const { return sp_ev_loop_; } diff --git a/event/engins/libev/signal_event.cpp b/event/engins/libev/signal_event.cpp deleted file mode 100644 index 690e70b00b5073758b848456d1f4b81eb59dcfae..0000000000000000000000000000000000000000 --- a/event/engins/libev/signal_event.cpp +++ /dev/null @@ -1,118 +0,0 @@ -#include "signal_event.h" - -#include - -#include "loop.h" -#include - -namespace tbox { -namespace event { - -LibevSignalEvent::LibevSignalEvent(LibevLoop *wp_loop) : - wp_loop_(wp_loop), - is_inited_(false), - is_stop_after_trigger_(false), - cb_level_(0) -{ - memset(&signal_ev_, 0, sizeof(signal_ev_)); -} - -LibevSignalEvent::~LibevSignalEvent() -{ - assert(cb_level_ == 0); - disable(); -} - -bool LibevSignalEvent::initialize(int signum, Mode mode) -{ - disable(); - - signal_ev_.active = signal_ev_.pending = 0; - signal_ev_.priority = 0; - signal_ev_.cb = LibevSignalEvent::OnEventCallback; - signal_ev_.data = this; - - signal_ev_.signum = signum; - - if (mode == Mode::kOneshot) //! 如果是单次有效的,需要设置标记,使之在触发后停止事件 - is_stop_after_trigger_ = true; - - is_inited_ = true; - return true; -} - -void LibevSignalEvent::setCallback(const CallbackFunc &cb) -{ - cb_ = cb; -} - -bool LibevSignalEvent::isEnabled() const -{ - if (!is_inited_) - return false; - - return signal_ev_.active; -} - -bool LibevSignalEvent::enable() -{ - if (!is_inited_) { - LogErr("can't enable() before initialize()"); - return false; - } - - if (isEnabled()) - return true; - - ev_signal_start(wp_loop_->getEvLoopPtr(), &signal_ev_); - - return true; -} - -bool LibevSignalEvent::disable() -{ - if (!is_inited_) - return false; - - if (!isEnabled()) - return true; - - ev_signal_stop(wp_loop_->getEvLoopPtr(), &signal_ev_); - - return true; -} - -Loop* LibevSignalEvent::getLoop() const -{ - return wp_loop_; -} - -void LibevSignalEvent::OnEventCallback(struct ev_loop*, ev_signal *p_w, int events) -{ - assert(p_w != NULL); - - LibevSignalEvent *pthis = static_cast(p_w->data); - pthis->onEvent(); -} - -void LibevSignalEvent::onEvent() -{ - wp_loop_->beginEventProcess(); - - if (cb_) { - ++cb_level_; - cb_(); - --cb_level_; - - if (is_stop_after_trigger_) - disable(); - - } else { - LogErr("you should specify event callback by setCallback()"); - } - - wp_loop_->endEventProcess(); -} - -} -} diff --git a/event/engins/libev/signal_event.h b/event/engins/libev/signal_event.h deleted file mode 100644 index 4c19f06631bdacf3b41b2a52787db2decaf50d9a..0000000000000000000000000000000000000000 --- a/event/engins/libev/signal_event.h +++ /dev/null @@ -1,44 +0,0 @@ -#ifndef TBOX_EVENT_LIBEV_SINGAL_EVENT_H_20170827 -#define TBOX_EVENT_LIBEV_SINGAL_EVENT_H_20170827 - -#include "../../signal_event.h" - -#include - -namespace tbox { -namespace event { - -class LibevLoop; - -class LibevSignalEvent : public SignalEvent { - public: - explicit LibevSignalEvent(LibevLoop *wp_loop); - virtual ~LibevSignalEvent(); - - public: - virtual bool initialize(int signum, Mode mode); - virtual void setCallback(const CallbackFunc &cb); - - virtual bool isEnabled() const; - virtual bool enable(); - virtual bool disable(); - - virtual Loop* getLoop() const; - - protected: - static void OnEventCallback(struct ev_loop *p_loop, ev_signal *p_w, int events); - void onEvent(); - - private: - LibevLoop *wp_loop_; - ev_signal signal_ev_; - bool is_inited_; - bool is_stop_after_trigger_; - CallbackFunc cb_; - int cb_level_; -}; - -} -} - -#endif //TBOX_EVENT_LIBEV_SINGAL_EVENT_H_20170827 diff --git a/event/engins/libevent/loop.cpp b/event/engins/libevent/loop.cpp index 645225a06147e1a3c28499129dac1b21b8639c17..d48c754dcaa0e60e96d3410199fa3dfb97d2eef1 100644 --- a/event/engins/libevent/loop.cpp +++ b/event/engins/libevent/loop.cpp @@ -5,7 +5,6 @@ #include "fd_event.h" #include "timer_event.h" -#include "signal_event.h" #include "common.h" namespace tbox { @@ -17,6 +16,8 @@ LibeventLoop::LibeventLoop() : LibeventLoop::~LibeventLoop() { + cleanupDeferredTasks(); + event_base_free(sp_event_base_); sp_event_base_ = NULL; } @@ -48,10 +49,5 @@ TimerEvent* LibeventLoop::newTimerEvent() return new LibeventTimerEvent(this); } -SignalEvent* LibeventLoop::newSignalEvent() -{ - return new LibeventSignalEvent(this); -} - } } diff --git a/event/engins/libevent/loop.h b/event/engins/libevent/loop.h index 9280bebe1d8a9e5362c48416d09da11d78d575c4..58a9fe97ee818776700251ca6e0e5d6deba6816f 100644 --- a/event/engins/libevent/loop.h +++ b/event/engins/libevent/loop.h @@ -19,7 +19,6 @@ class LibeventLoop : public CommonLoop { virtual FdEvent* newFdEvent(); virtual TimerEvent* newTimerEvent(); - virtual SignalEvent* newSignalEvent(); public: struct event_base* getEventBasePtr() const { return sp_event_base_; } diff --git a/event/engins/libevent/signal_event.cpp b/event/engins/libevent/signal_event.cpp deleted file mode 100644 index 2649590d81a8a9f430e02d58e77d76eaace5874c..0000000000000000000000000000000000000000 --- a/event/engins/libevent/signal_event.cpp +++ /dev/null @@ -1,124 +0,0 @@ -#include "signal_event.h" - -#include -#include - -#include "loop.h" - -#include - -namespace tbox { -namespace event { - -LibeventSignalEvent::LibeventSignalEvent(LibeventLoop *wp_loop) : - wp_loop_(wp_loop), - is_inited_(false), - cb_level_(0) -{ - event_assign(&event_, NULL, -1, 0, NULL, NULL); -} - -LibeventSignalEvent::~LibeventSignalEvent() -{ - assert(cb_level_ == 0); - disable(); -} - -bool LibeventSignalEvent::initialize(int signum, Mode mode) -{ - disable(); - - short libevent_events = 0; - if (mode == Mode::kPersist) - libevent_events |= EV_PERSIST; - - int ret = event_assign(&event_, wp_loop_->getEventBasePtr(), signum, - libevent_events | EV_SIGNAL, - LibeventSignalEvent::OnEventCallback, - this); - if (ret == 0) { - is_inited_ = true; - return true; - } - - LogErr("event_assign() fail"); - return false; -} - -void LibeventSignalEvent::setCallback(const CallbackFunc &cb) -{ - cb_ = cb; -} - -bool LibeventSignalEvent::isEnabled() const -{ - if (!is_inited_) - return false; - - return event_pending(&event_, EV_SIGNAL, NULL) != 0; -} - -bool LibeventSignalEvent::enable() -{ - if (!is_inited_) { - LogErr("can't enable() before initialize()"); - return false; - } - - if (isEnabled()) - return true; - - int ret = event_add(&event_, NULL); - if (ret != 0) { - LogErr("event_add() fail"); - return false; - } - - return true; -} - -bool LibeventSignalEvent::disable() -{ - if (!is_inited_) - return false; - - if (!isEnabled()) - return true; - - int ret = event_del(&event_); - if (ret != 0) { - LogErr("event_del() fail"); - return false; - } - - return true; -} - -Loop* LibeventSignalEvent::getLoop() const -{ - return wp_loop_; -} - -void LibeventSignalEvent::OnEventCallback(int, short, void *args) -{ - LibeventSignalEvent *pthis = static_cast(args); - pthis->onEvent(); -} - -void LibeventSignalEvent::onEvent() -{ - wp_loop_->beginEventProcess(); - - if (cb_) { - ++cb_level_; - cb_(); - --cb_level_; - } else { - LogWarn("you should specify event callback by setCallback()"); - } - - wp_loop_->endEventProcess(); -} - -} -} diff --git a/event/engins/libevent/signal_event.h b/event/engins/libevent/signal_event.h deleted file mode 100644 index e9c0116d07a862612251934ce6e6e159a429021f..0000000000000000000000000000000000000000 --- a/event/engins/libevent/signal_event.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef TBOX_EVENT_LIBEVENT_SINGAL_EVENT_H_20170715 -#define TBOX_EVENT_LIBEVENT_SINGAL_EVENT_H_20170715 - -#include "../../signal_event.h" -#include - -namespace tbox { -namespace event { - -class LibeventLoop; - -class LibeventSignalEvent : public SignalEvent { - public: - explicit LibeventSignalEvent(LibeventLoop *wp_loop); - virtual ~LibeventSignalEvent(); - - public: - virtual bool initialize(int signum, Mode mode); - virtual void setCallback(const CallbackFunc &cb); - - virtual bool isEnabled() const; - virtual bool enable(); - virtual bool disable(); - - virtual Loop* getLoop() const; - - protected: - static void OnEventCallback(int, short, void *args); - void onEvent(); - - private: - LibeventLoop *wp_loop_; - - struct event event_; - bool is_inited_; - CallbackFunc cb_; - int cb_level_; -}; - -} -} - -#endif //TBOX_EVENT_LIBEVENT_SINGAL_EVENT_H_20170715 diff --git a/event/fd_event_test.cpp b/event/fd_event_test.cpp index 27f0692f3e61ccb439c9fc177820dedaf3d50be8..7438093733cd54ac5058ffc5bbcfeef79dbcf37d 100644 --- a/event/fd_event_test.cpp +++ b/event/fd_event_test.cpp @@ -236,3 +236,17 @@ TEST(FdEvent, MultiWriteOneRead) close(read_fd); } } + + +TEST(FdEvent, DeleteLater) +{ + auto engins = Loop::Engines(); + for (auto e : engins) { + cout << "engin: " << e << endl; + auto sp_loop = Loop::New(e); + auto sp_fd_event = sp_loop->newFdEvent(); + sp_fd_event->initialize(1, FdEvent::kReadEvent, Event::Mode::kPersist); + sp_loop->run([=] { delete sp_fd_event; }); + delete sp_loop; + } +} diff --git a/event/loop.cpp b/event/loop.cpp index a65ab640242637f597235b5bcaa7c842c0be5b9b..47ec08b255ad7869c9dcb79901c75e3ff2d0a2a2 100644 --- a/event/loop.cpp +++ b/event/loop.cpp @@ -3,12 +3,14 @@ #if defined(ENABLE_LIBEVENT) #include "engins/libevent/loop.h" -#elif defined(ENABLE_LIBEV) +#endif + +#if defined(ENABLE_LIBEV) #include "engins/libev/loop.h" -#elif defined(ENABLE_EPOLL) +#endif + +#if defined(ENABLE_EPOLL) #include "engins/epoll/loop.h" -#else -#error("no engin specified!!!") #endif namespace tbox { @@ -48,14 +50,14 @@ Loop* Loop::New(const std::string &engine_type) std::vector Loop::Engines() { std::vector types; -#ifdef ENABLE_EPOLL - types.push_back("epoll"); -#endif #ifdef ENABLE_LIBEVENT types.push_back("libevent"); #endif #ifdef ENABLE_LIBEV types.push_back("libev"); +#endif +#ifdef ENABLE_EPOLL + types.push_back("epoll"); #endif return types; } diff --git a/event/misc.cpp b/event/misc.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6ef301e21796edeab94ce28b76217723e9b846ec --- /dev/null +++ b/event/misc.cpp @@ -0,0 +1,26 @@ +#include "misc.h" + +#include +#include +#include + +#include + +namespace tbox { +namespace event { + +bool CreateFdPair(int &read_fd, int &write_fd) +{ + int fds[2] = { 0 }; + if (pipe2(fds, O_CLOEXEC | O_NONBLOCK) != 0) { //!FIXME + LogErr("pip2() fail, ret:%d", errno); + return false; + } + + read_fd = fds[0]; + write_fd = fds[1]; + return true; +} + +} +} diff --git a/event/misc.h b/event/misc.h new file mode 100644 index 0000000000000000000000000000000000000000..8875ce7e87b429c1c7575275f8aa462b3d3dec51 --- /dev/null +++ b/event/misc.h @@ -0,0 +1,12 @@ +#ifndef TBOX_EVENT_MISC_H_20220303 +#define TBOX_EVENT_MISC_H_20220303 + +namespace tbox { +namespace event { + +bool CreateFdPair(int &read_fd, int &write_fd); + +} +} + +#endif //TBOX_EVENT_MISC_H_20220303 diff --git a/event/signal_event.h b/event/signal_event.h index 374866f38da7b80c1008e9a1881bb8e3c6087480..e8235cbe2811bdfbf2cde80b623a8c619cd08e6a 100644 --- a/event/signal_event.h +++ b/event/signal_event.h @@ -3,6 +3,7 @@ #include #include +#include #include "event.h" @@ -12,8 +13,9 @@ namespace event { class SignalEvent : public Event { public: virtual bool initialize(int signum, Mode mode) = 0; + virtual bool initialize(const std::set &sigset, Mode mode) = 0; - using CallbackFunc = std::function; + using CallbackFunc = std::function; virtual void setCallback(const CallbackFunc &cb) = 0; public: diff --git a/event/signal_event_impl.cpp b/event/signal_event_impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a5da9dade834727cbb0224cb7cbc01746fd3bda5 --- /dev/null +++ b/event/signal_event_impl.cpp @@ -0,0 +1,76 @@ +#include "signal_event_impl.h" +#include "common_loop.h" +#include + +namespace tbox { +namespace event { + +SignalEventImpl::SignalEventImpl(CommonLoop *wp_loop) : + wp_loop_(wp_loop) +{ } + +SignalEventImpl::~SignalEventImpl() +{ + disable(); +} + +bool SignalEventImpl::initialize(int signo, Mode mode) +{ + sigset_.insert(signo); + mode_ = mode; + + is_inited_ = true; + return true; +} + +bool SignalEventImpl::initialize(const std::set &sigset, Mode mode) +{ + sigset_ = sigset; + mode_ = mode; + + is_inited_ = true; + return true; +} + +bool SignalEventImpl::enable() +{ + if (is_inited_) { + for (int signo : sigset_) { + if (!wp_loop_->subscribeSignal(signo, this)) { + return false; + } + } + } + is_enabled_ = true; + return true; +} + +bool SignalEventImpl::disable() +{ + if (is_enabled_) { + for (int signo : sigset_) { + if (!wp_loop_->unsubscribeSignal(signo, this)) { + return false; + } + } + } + is_enabled_ = false; + return true; +} + +Loop* SignalEventImpl::getLoop() const +{ + return wp_loop_; +} + +void SignalEventImpl::onSignal(int signo) +{ + if (mode_ == Mode::kOneshot) + disable(); + + if (cb_) + cb_(signo); +} + +} +} diff --git a/event/signal_event_impl.h b/event/signal_event_impl.h new file mode 100644 index 0000000000000000000000000000000000000000..fe6125af1e8a84789c2afb6825fd22dbe7321eff --- /dev/null +++ b/event/signal_event_impl.h @@ -0,0 +1,54 @@ +#ifndef TBOX_EVENT_SIGNAL_EVENT_IMPL_H_20220301 +#define TBOX_EVENT_SIGNAL_EVENT_IMPL_H_20220301 + +#include "signal_event.h" + +namespace tbox { +namespace event { + +class CommonLoop; + +class SignalSubscribuer { + public: + virtual void onSignal(int signo) = 0; + + protected: + virtual ~SignalSubscribuer() { } +}; + +class SignalEventImpl : public SignalEvent, + public SignalSubscribuer { + public: + explicit SignalEventImpl(CommonLoop *wp_loop); + virtual ~SignalEventImpl(); + + public: + bool initialize(int signum, Mode mode) override; + bool initialize(const std::set &sigset, Mode mode) override; + + void setCallback(const CallbackFunc &cb) override { cb_ = cb; } + + bool isEnabled() const override { return is_enabled_; } + bool enable() override; + bool disable() override; + + Loop* getLoop() const override; + + public: + void onSignal(int signo) override; + + private: + CommonLoop *wp_loop_; + CallbackFunc cb_; + + bool is_inited_ = false; + bool is_enabled_ = false; + + std::set sigset_; + Mode mode_ = Mode::kPersist; +}; + +} +} + +#endif //TBOX_EVENT_SIGNAL_EVENT_IMPL_H_20220301 diff --git a/event/signal_event_test.cpp b/event/signal_event_test.cpp index a38db6c8d5fb3be2d53c2a4aee10c917de66a752..e30587aacec56e532ff8a066e0fb2f7959c9537c 100644 --- a/event/signal_event_test.cpp +++ b/event/signal_event_test.cpp @@ -1,5 +1,10 @@ #include #include +#include +#include +#include +#include +#include #include "loop.h" #include "signal_event.h" @@ -17,17 +22,19 @@ TEST(SignalEvent, Oneshot) cout << "engin: " << e << endl; auto sp_loop = Loop::New(e); auto signal_event = sp_loop->newSignalEvent(); - EXPECT_TRUE(signal_event->initialize(SIGINT, Event::Mode::kOneshot)); + EXPECT_TRUE(signal_event->initialize(SIGUSR1, Event::Mode::kOneshot)); EXPECT_TRUE(signal_event->enable()); int run_time = 0; - signal_event->setCallback([&]() { ++run_time; }); - - sp_loop->run([] - { - raise(SIGINT); + signal_event->setCallback( + [&](int signo) { + EXPECT_EQ(signo, SIGUSR1); + ++run_time; } ); + + sp_loop->run([] { raise(SIGUSR1); }); + sp_loop->exitLoop(std::chrono::milliseconds(100)); sp_loop->runLoop(); @@ -44,8 +51,9 @@ TEST(SignalEvent, PersistWithTimerEvent) for (auto e : engins) { cout << "engin: " << e << endl; auto sp_loop = Loop::New(e); + auto signal_event = sp_loop->newSignalEvent(); - EXPECT_TRUE(signal_event->initialize(SIGINT, Event::Mode::kPersist)); + EXPECT_TRUE(signal_event->initialize(SIGUSR1, Event::Mode::kPersist)); EXPECT_TRUE(signal_event->enable()); auto timer_event = sp_loop->newTimerEvent(); @@ -56,13 +64,18 @@ TEST(SignalEvent, PersistWithTimerEvent) { ++count; if (count <= 5) { - raise(SIGINT); + raise(SIGUSR1); } } ); int run_time = 0; - signal_event->setCallback([&]() { ++run_time; }); + signal_event->setCallback( + [&](int signo) { + EXPECT_EQ(signo, SIGUSR1); + ++run_time; + } + ); sp_loop->exitLoop(std::chrono::milliseconds(100)); sp_loop->runLoop(); @@ -75,40 +88,358 @@ TEST(SignalEvent, PersistWithTimerEvent) } } -TEST(SignalEvent, IntAndTermSignal) +TEST(SignalEvent, MultiSignalMultiEvents) +{ + auto engins = Loop::Engines(); + for (auto e : engins) { + cout << "engin: " << e << endl; + auto sp_loop = Loop::New(e); + auto user1_signal_event = sp_loop->newSignalEvent(); + EXPECT_TRUE(user1_signal_event->initialize(SIGUSR1, Event::Mode::kOneshot)); + EXPECT_TRUE(user1_signal_event->enable()); + + auto user2_signal_event = sp_loop->newSignalEvent(); + EXPECT_TRUE(user2_signal_event->initialize(SIGUSR2, Event::Mode::kOneshot)); + EXPECT_TRUE(user2_signal_event->enable()); + + int user1_run_time = 0; + user1_signal_event->setCallback( + [&](int signo) { + EXPECT_EQ(signo, SIGUSR1); + ++user1_run_time; + } + ); + int user2_run_time = 0; + user2_signal_event->setCallback( + [&](int signo) { + EXPECT_EQ(signo, SIGUSR2); + ++user2_run_time; + } + ); + + sp_loop->run([] + { + raise(SIGUSR1); + raise(SIGUSR2); + } + ); + sp_loop->exitLoop(std::chrono::milliseconds(100)); + sp_loop->runLoop(); + + EXPECT_EQ(user1_run_time, 1); + EXPECT_EQ(user2_run_time, 1); + + + delete user1_signal_event; + delete user2_signal_event; + delete sp_loop; + } +} + +TEST(SignalEvent, MultiThread) { auto engins = Loop::Engines(); for (auto e : engins) { cout << "engin: " << e << endl; auto sp_loop = Loop::New(e); - auto int_signal_event = sp_loop->newSignalEvent(); - EXPECT_TRUE(int_signal_event->initialize(SIGINT, Event::Mode::kOneshot)); - EXPECT_TRUE(int_signal_event->enable()); + auto user1_signal_event = sp_loop->newSignalEvent(); + EXPECT_TRUE(user1_signal_event->initialize(SIGUSR1, Event::Mode::kOneshot)); + EXPECT_TRUE(user1_signal_event->enable()); - auto term_signal_event = sp_loop->newSignalEvent(); - EXPECT_TRUE(term_signal_event->initialize(SIGTERM, Event::Mode::kOneshot)); - EXPECT_TRUE(term_signal_event->enable()); + auto user2_signal_event = sp_loop->newSignalEvent(); + EXPECT_TRUE(user2_signal_event->initialize(SIGUSR2, Event::Mode::kOneshot)); + EXPECT_TRUE(user2_signal_event->enable()); - int int_run_time = 0; - int_signal_event->setCallback([&]() { ++int_run_time; }); - int term_run_time = 0; - term_signal_event->setCallback([&]() { ++term_run_time; }); + int user1_run_time = 0; + user1_signal_event->setCallback( + [&](int signo) { + EXPECT_EQ(signo, SIGUSR1); + ++user1_run_time; + } + ); + int user2_run_time = 0; + user2_signal_event->setCallback( + [&](int signo) { + EXPECT_EQ(signo, SIGUSR2); + ++user2_run_time; + } + ); sp_loop->run([] { - raise(SIGINT); - raise(SIGTERM); + raise(SIGUSR1); + raise(SIGUSR2); + } + ); + + bool t1_run = false; + //! t1 线程sleep 200ms + auto t1 = std::thread( + [&] { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + t1_run = true; + } + ); + + //! t2 线程等待一个信号 + bool exit_thread = false; + std::mutex lock; + std::condition_variable cond_var; + bool t2_run = false; + auto t2 = std::thread( + [&] { + std::unique_lock lk(lock); + cond_var.wait(lk, [&]{ return exit_thread; }); + t2_run = true; + } + ); + + + sp_loop->exitLoop(std::chrono::milliseconds(100)); + sp_loop->runLoop(); + + t1.join(); + { + std::unique_lock lk(lock); + exit_thread = true; + cond_var.notify_one(); + } + t2.join(); + + EXPECT_TRUE(t1_run); + EXPECT_TRUE(t2_run); + EXPECT_EQ(user1_run_time, 1); + EXPECT_EQ(user2_run_time, 1); + + delete user1_signal_event; + delete user2_signal_event; + delete sp_loop; + } +} + +//! 同一种信号被多个事件监听 +TEST(SignalEvent, OneSignalMultiEvents) +{ + auto engins = Loop::Engines(); + for (auto e : engins) { + cout << "engin: " << e << endl; + auto sp_loop = Loop::New(e); + + auto signal_event_1 = sp_loop->newSignalEvent(); + auto signal_event_2 = sp_loop->newSignalEvent(); + + EXPECT_TRUE(signal_event_1->initialize(SIGUSR1, Event::Mode::kOneshot)); + EXPECT_TRUE(signal_event_2->initialize(SIGUSR1, Event::Mode::kOneshot)); + + EXPECT_TRUE(signal_event_1->enable()); + EXPECT_TRUE(signal_event_2->enable()); + + int run_time_1 = 0; + int run_time_2 = 0; + + signal_event_1->setCallback( + [&](int signo) { + EXPECT_EQ(signo, SIGUSR1); + ++run_time_1; + } + ); + signal_event_2->setCallback( + [&](int signo) { + EXPECT_EQ(signo, SIGUSR1); + ++run_time_2; + } + ); + + sp_loop->run([] { raise(SIGUSR1); }); + sp_loop->exitLoop(std::chrono::milliseconds(100)); + sp_loop->runLoop(); + + EXPECT_EQ(run_time_1, 1); + EXPECT_EQ(run_time_2, 1); + + + delete signal_event_1; + delete signal_event_2; + delete sp_loop; + } +} + +//! 多线程下多个Loop的事件监听同一个信号 +TEST(SignalEvent, OneSignalMultiLoopInMultiThread) +{ + auto engins = Loop::Engines(); + for (auto e : engins) { + cout << "engin: " << e << endl; + + auto thread_func = [e] (int &run_time) { + auto sp_loop = Loop::New(e); + auto signal_event = sp_loop->newSignalEvent(); + signal_event->initialize(SIGUSR1, Event::Mode::kPersist); + signal_event->enable(); + signal_event->setCallback([&](int) { ++run_time; }); + + sp_loop->exitLoop(std::chrono::milliseconds(200)); + sp_loop->runLoop(); + + delete signal_event; + delete sp_loop; + }; + + int run_time_1 = 0; + int run_time_2 = 0; + + auto t1 = std::thread(std::bind(thread_func, std::ref(run_time_1))); + auto t2 = std::thread(std::bind(thread_func, std::ref(run_time_2))); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + raise(SIGUSR1); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + raise(SIGUSR1); + + t1.join(); + t2.join(); + + EXPECT_EQ(run_time_1, 2); + EXPECT_EQ(run_time_2, 2); + } +} + +//! 多线程下多个Loop的事件监听同多个信号 +TEST(SignalEvent, MultiSignalMultiLoopInMultiThread) +{ + LogOutput_Initialize("test"); + + auto engins = Loop::Engines(); + for (auto e : engins) { + cout << "engin: " << e << endl; + + auto thread_func = [e] (int &user1_run_time, int &user2_run_time) { + auto sp_loop = Loop::New(e); + + auto user1_signal_event = sp_loop->newSignalEvent(); + user1_signal_event->initialize(SIGUSR1, Event::Mode::kPersist); + user1_signal_event->enable(); + user1_signal_event->setCallback([&](int) { ++user1_run_time; }); + + auto user2_signal_event = sp_loop->newSignalEvent(); + user2_signal_event->initialize(SIGUSR2, Event::Mode::kPersist); + user2_signal_event->enable(); + user2_signal_event->setCallback([&](int) { ++user2_run_time; }); + + sp_loop->exitLoop(std::chrono::milliseconds(200)); + sp_loop->runLoop(); + + delete user2_signal_event; + delete user1_signal_event; + delete sp_loop; + }; + + int user1_run_time_1 = 0, user2_run_time_1 = 0; + int user1_run_time_2 = 0, user2_run_time_2 = 0; + + auto t1 = std::thread(std::bind(thread_func, std::ref(user1_run_time_1), std::ref(user2_run_time_1))); + auto t2 = std::thread(std::bind(thread_func, std::ref(user1_run_time_2), std::ref(user2_run_time_2))); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + raise(SIGUSR1); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + raise(SIGUSR2); + + t1.join(); + t2.join(); + + EXPECT_EQ(user1_run_time_1, 1); + EXPECT_EQ(user2_run_time_1, 1); + EXPECT_EQ(user1_run_time_2, 1); + EXPECT_EQ(user2_run_time_2, 1); + } +} + +//! 同一个事件,监听多个事件 +TEST(SignalEvent, OneEventMultiSignal) +{ + auto engins = Loop::Engines(); + for (auto e : engins) { + cout << "engin: " << e << endl; + auto sp_loop = Loop::New(e); + auto signal_event = sp_loop->newSignalEvent(); + std::set sigset = { SIGUSR1, SIGUSR2 }; + EXPECT_TRUE(signal_event->initialize(sigset, Event::Mode::kPersist)); + EXPECT_TRUE(signal_event->enable()); + + int user1_run_time = 0; + int user2_run_time = 0; + signal_event->setCallback( + [&](int signo) { + //LogTrace("signo:%d", signo); + if (signo == SIGUSR1) + ++user1_run_time; + else if (signo == SIGUSR2) + ++user2_run_time; + else {} } ); + + sp_loop->run( + [] { + raise(SIGUSR1); + raise(SIGUSR2); + } + ); + sp_loop->exitLoop(std::chrono::milliseconds(100)); sp_loop->runLoop(); - EXPECT_EQ(int_run_time, 1); - EXPECT_EQ(term_run_time, 1); + EXPECT_EQ(user1_run_time, 1); + EXPECT_EQ(user2_run_time, 1); - delete int_signal_event; - delete term_signal_event; + delete signal_event; delete sp_loop; } } +//! 短时间内触发非常多的信号 +TEST(SignalEvent, LargeNumberOfSignals) +{ + auto engins = Loop::Engines(); + for (auto e : engins) { + cout << "engin: " << e << endl; + auto sp_loop = Loop::New(e); + auto signal_event = sp_loop->newSignalEvent(); + EXPECT_TRUE(signal_event->initialize(SIGUSR1, Event::Mode::kPersist)); + EXPECT_TRUE(signal_event->enable()); + + int user1_run_time = 0; + signal_event->setCallback( + [&](int signo) { + if (signo == SIGUSR1) + ++user1_run_time; + } + ); + + int remain = 1000; + std::function func = \ + [&] { + --remain; + if (remain >= 0) { + raise(SIGUSR1); + sp_loop->runInLoop(func); + } else { + sp_loop->exitLoop(std::chrono::milliseconds(10)); + } + }; + sp_loop->runInLoop(func); + + sp_loop->runLoop(); + + EXPECT_EQ(remain, -1); + EXPECT_EQ(user1_run_time, 1000); + + delete signal_event; + delete sp_loop; + } +} + + diff --git a/main/main.cpp b/main/main.cpp index f1bbde9943f8ba006993fe4c86951542130be40d..8700da1a4878dfc3fec5b3957ef7d092bbe8a8e1 100644 --- a/main/main.cpp +++ b/main/main.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -25,7 +26,6 @@ extern void RegisterApps(Apps &apps); //! 由用户去实现 extern void Run(ContextImp &ctx, AppsImp &apps, int loop_exit_wait); std::function error_exit_func; //!< 出错异常退出前要做的事件 -std::function normal_stop_func; //!< 正常退出前要做的事件 int Main(int argc, char **argv) { @@ -109,20 +109,27 @@ int Main(int argc, char **argv) void Run(ContextImp &ctx, AppsImp &apps, int loop_exit_wait) { - auto feeddog_timer = ctx.loop()->newTimerEvent(); + auto feeddog_timer = ctx.loop()->newTimerEvent(); + auto stop_signal = ctx.loop()->newSignalEvent(); + //! 预定在离开时自动释放对象,确保无内存泄漏 - SetScopeExitAction([feeddog_timer] { delete feeddog_timer; }); - - normal_stop_func = [&] { - ctx.loop()->runInLoop([&] { - LogInfo("Got stop signal"); - apps.stop(); - ctx.stop(); - ctx.loop()->exitLoop(std::chrono::seconds(loop_exit_wait)); - LogInfo("Loop will exit after %d sec", loop_exit_wait); - } - ); - }; + SetScopeExitAction( + [=] { + delete stop_signal; + delete feeddog_timer; + } + ); + + stop_signal->initialize(std::set{SIGINT, SIGTERM}, event::Event::Mode::kOneshot); + stop_signal->setCallback( + [&] (int signo) { + LogInfo("Got signal %d", signo); + apps.stop(); + ctx.stop(); + ctx.loop()->exitLoop(std::chrono::seconds(loop_exit_wait)); + LogInfo("Loop will exit after %d sec", loop_exit_wait); + } + ); //! 创建喂狗定时器 feeddog_timer->initialize(std::chrono::seconds(2), event::Event::Mode::kPersist); @@ -133,6 +140,7 @@ void Run(ContextImp &ctx, AppsImp &apps, int loop_exit_wait) util::ThreadWDog::Register("main", 3); feeddog_timer->enable(); + stop_signal->enable(); LogInfo("Start!"); diff --git a/main/signal.cpp b/main/signal.cpp index 5de2369310a6934c0c874e6f4a555f06f0b715b3..ba83d31bd98f5b6788d1d01d4708a1b617d2948d 100644 --- a/main/signal.cpp +++ b/main/signal.cpp @@ -10,18 +10,9 @@ namespace tbox::main { extern std::function error_exit_func; //!< 出错异常退出前要做的事件 -extern std::function normal_stop_func; //!< 正常退出前要做的事件 namespace { -void OnStopSignal(int) -{ - if (normal_stop_func) - normal_stop_func(); - else - exit(0); -} - //! 打印调用栈 void PrintCallStack() { @@ -68,8 +59,6 @@ void RegisterSignals() signal(SIGABRT, OnErrorSignal); signal(SIGBUS, OnErrorSignal); signal(SIGPIPE, OnWarnSignal); - signal(SIGINT, OnStopSignal); - signal(SIGTERM, OnStopSignal); } }