From 51336345999246189bf5ab822dd671f563f1cecb Mon Sep 17 00:00:00 2001 From: huangduirong Date: Tue, 31 Oct 2023 06:05:59 +0800 Subject: [PATCH] support event signal in libhv --- libhv.spec | 6 +- support-event-signal-in-libhv.patch | 437 ++++++++++++++++++++++++++++ 2 files changed, 442 insertions(+), 1 deletion(-) create mode 100755 support-event-signal-in-libhv.patch diff --git a/libhv.spec b/libhv.spec index 4d98ef3..ca99794 100644 --- a/libhv.spec +++ b/libhv.spec @@ -1,12 +1,13 @@ Name: libhv Version: 1.3.1 -Release: 1 +Release: 2 Summary: Like libevent, libev, and libuv, libhv provides event-loop with non-blocking IO and timer, but simpler api and richer protocols License: BSD 3-Clause License URL: https://gitee.com/libhv/libhv Source0: https://gitee.com/libhv/libhv/archive/refs/tags/v1.3.1.tar.gz#/%{name}-v%{version}.tar.gz Patch001: support-lib64-path.patch +Patch002: support-event-signal-in-libhv.patch BuildRequires: gcc gcc-c++ openssl-devel procps-ng net-tools cmake @@ -52,6 +53,9 @@ cd %{_vpath_builddir} %{_libdir}/*.a %changelog +* Tue Oct 31 2023 huangduirong - 1.3.1-2 +- support event signal + * Mon Jul 17 2023 zhangchenglin - 1.3.1-1 - Update to version 1.3.1 diff --git a/support-event-signal-in-libhv.patch b/support-event-signal-in-libhv.patch new file mode 100755 index 0000000..ad95fba --- /dev/null +++ b/support-event-signal-in-libhv.patch @@ -0,0 +1,437 @@ +From a3dd0be55b679c28185a464c537a2615fe723d99 Mon Sep 17 00:00:00 2001 +From: huangduirong +Date: Mon, 30 Oct 2023 19:40:31 +0800 +Subject: [PATCH] support event signal in libhv + +--- + event/hevent.h | 14 +++ + event/hloop.c | 19 +++++ + event/hloop.h | 7 ++ + event/hsignal.c | 161 +++++++++++++++++++++++++++++++++++ + unittest/CMakeLists.txt | 6 ++ + unittest/event_signal_test.c | 114 +++++++++++++++++++++++++ + 6 files changed, 321 insertions(+) + create mode 100755 event/hsignal.c + create mode 100755 unittest/event_signal_test.c + +diff --git a/event/hevent.h b/event/hevent.h +index 782b0ca..ebf58f3 100644 +--- a/event/hevent.h ++++ b/event/hevent.h +@@ -1,6 +1,7 @@ + #ifndef HV_EVENT_H_ + #define HV_EVENT_H_ + ++#include + #include "hloop.h" + #include "iowatcher.h" + #include "rudp.h" +@@ -48,6 +49,12 @@ struct hloop_s { + // idles + struct list_head idles; + uint32_t nidles; ++ // signals ++ uint32_t nsignals; ++ uint32_t enable_signal; ++ struct list_head signal_events_head[_NSIG]; ++ struct list_head awaken_signal_events_head; ++ hio_t* signal_io_watcher; + // timers + struct heap timers; // monotonic time + struct heap realtimers; // realtime +@@ -73,6 +80,13 @@ struct hidle_s { + struct list_node node; + }; + ++struct hsig_s { ++ HEVENT_FIELDS ++ uint32_t signal; ++ uint32_t num_calls; ++ struct list_node self_signal_node; ++}; ++ + #define HTIMER_FIELDS \ + HEVENT_FIELDS \ + uint32_t repeat; \ +diff --git a/event/hloop.c b/event/hloop.c +index 2309bcf..866a1c9 100644 +--- a/event/hloop.c ++++ b/event/hloop.c +@@ -319,6 +319,11 @@ static void hloop_init(hloop_t* loop) { + // idles + list_init(&loop->idles); + ++ // signals ++ for (int i = 0; i < _NSIG; i++) { ++ list_init(&(loop->signal_events_head[i])); ++ } ++ + // timers + heap_init(&loop->timers, timers_compare); + heap_init(&loop->realtimers, timers_compare); +@@ -371,6 +376,20 @@ static void hloop_cleanup(hloop_t* loop) { + HV_FREE(idle); + } + list_init(&loop->idles); ++ loop->signal_io_watcher = NULL; ++ ++ // signals ++ printd("cleanup signals...\n"); ++ for (int i = 0; i < _NSIG; i++) { ++ struct list_node* sig_node = loop->signal_events_head[i].next; ++ hsig_t* signal; ++ while (sig_node != &loop->signal_events_head[i]) { ++ signal = IDLE_ENTRY(sig_node); ++ sig_node = sig_node->next; ++ HV_FREE(signal); ++ } ++ list_init(&(loop->signal_events_head[i])); ++ } + + // timers + printd("cleanup timers...\n"); +diff --git a/event/hloop.h b/event/hloop.h +index be70cfd..22122f9 100644 +--- a/event/hloop.h ++++ b/event/hloop.h +@@ -16,11 +16,13 @@ typedef struct htimer_s htimer_t; + typedef struct htimeout_s htimeout_t; + typedef struct hperiod_s hperiod_t; + typedef struct hio_s hio_t; ++typedef struct hsig_s hsig_t; + + typedef void (*hevent_cb) (hevent_t* ev); + typedef void (*hidle_cb) (hidle_t* idle); + typedef void (*htimer_cb) (htimer_t* timer); + typedef void (*hio_cb) (hio_t* io); ++typedef void (*hsig_cb) (hsig_t* signal); + + typedef void (*haccept_cb) (hio_t* io); + typedef void (*hconnect_cb) (hio_t* io); +@@ -42,6 +44,7 @@ typedef enum { + HEVENT_TYPE_TIMER = HEVENT_TYPE_TIMEOUT|HEVENT_TYPE_PERIOD, + HEVENT_TYPE_IDLE = 0x00000100, + HEVENT_TYPE_CUSTOM = 0x00000400, // 1024 ++ HEVENT_TYPE_SIGNAL = 0x00001000, + } hevent_type_e; + + #define HEVENT_LOWEST_PRIORITY (-5) +@@ -186,6 +189,10 @@ HV_EXPORT void hloop_post_event(hloop_t* loop, hevent_t* ev); + HV_EXPORT hidle_t* hidle_add(hloop_t* loop, hidle_cb cb, uint32_t repeat DEFAULT(INFINITE)); + HV_EXPORT void hidle_del(hidle_t* idle); + ++// signal ++HV_EXPORT hsig_t* hsig_add(hloop_t* loop, hsig_cb cb, uint32_t signal DEFAULT(INFINITE)); ++HV_EXPORT void hsig_del(hsig_t* signal); ++ + // timer + HV_EXPORT htimer_t* htimer_add(hloop_t* loop, htimer_cb cb, uint32_t timeout_ms, uint32_t repeat DEFAULT(INFINITE)); + /* +diff --git a/event/hsignal.c b/event/hsignal.c +new file mode 100755 +index 0000000..930286d +--- /dev/null ++++ b/event/hsignal.c +@@ -0,0 +1,161 @@ ++#define _POSIX_SOURCE ++ ++#include "list.h" ++#include "hevent.h" ++ ++#define SIGNAL_ENTRY(p) list_entry(p, hsig_t, self_signal_node) ++ ++#ifndef SA_RESTART ++#define SA_RESTART 0x1000000 ++#endif ++ ++/* ++* 此处的fd不能使用全局的内容,必须一个loop一个,因为在执行sig_event_cb的是偶,每个loop会读取 ++* fd中写入的内容,如果内容全部被第一个loop读取之后,其余的loop就无法读取到信息。 ++* 因此当前的使用中有一个限制,一个进程中只能申请一次loop_new,如果后续有多次loop_new的场景 ++* 需要对此业务逻辑进行优化。 ++*/ ++static int sig_write_fd = -1; ++static int pair[2]; ++static bool sig_init[_NSIG]; ++ ++void sig_handler(int sig) ++{ ++ int save_errno = errno; ++ char signum = (char)sig; ++ int n = write(sig_write_fd, &signum, 1); ++ printf("write %d signals in sig_handler\n", n); ++ ++ errno = save_errno; ++} ++ ++void sig_event_cb(hio_t *io) ++{ ++ printf("in sig_event_cb\n"); ++ char signals[1024]; ++ int n = 0; ++ int ncaught[_NSIG]; ++ int fd = io->fd; ++ hloop_t *loop = hio_context(io); ++ struct list_head *events_at_sig = NULL; ++ struct list_head *ev_node = NULL; ++ memset(signals, 0, sizeof(signals)); ++ memset(ncaught, 0, sizeof(ncaught)); ++ ++ if (loop == NULL) { ++ return; ++ } ++ ++ while (true) { ++ n = read(fd, signals, sizeof(signals)); ++ if (n <= 0) { ++ break; ++ } ++ for (int i = 0; i < n; ++i) { ++ char sig = signals[i]; ++ if (sig < _NSIG) ++ ncaught[sig]++; ++ } ++ } ++ ++ for (int i = 0; i < _NSIG; i++) { ++ if (ncaught[i] > 0) { ++ events_at_sig = &(loop->signal_events_head[i]); ++ if (!list_empty(events_at_sig)) { ++ ev_node = events_at_sig->next; ++ while (ev_node != events_at_sig) { ++ hsig_t *signal = SIGNAL_ENTRY(ev_node); ++ signal->num_calls = ncaught[i]; ++ // In this part, there should be no need to add events to the awaken_signal_events_head list. ++ // Instead, they should be directly added to the existing loop's pending. ++ // According to the logic in hloop.c, various types of events should not invoke their callbacks when processed but should be placed in the pending queue. The callbacks for these events should be invoked collectively after all events are processed (except for timeouts). ++ // Therefore, the existing logic in hloop should handle the current awaken_signal_events_head. ++ // However, it's necessary to consider the case where num_calls is greater than one. Multiple pending queues may be needed. ++ for (int j = 0; j < signal->num_calls; j++) { ++ EVENT_PENDING(signal); ++ } ++ ev_node = ev_node->next; ++ } ++ } ++ } ++ } ++ ++ printf("exit sig_event_cb\n"); ++} ++ ++/*** ++ * 该函数主要用于新增一个信号量,loop中的signal_events_head为一个_NSIG的最大值, ++ * 当每新增一个信号量,就需要追加到signal_events_head[sig]链表中。 ++ * 信号量处理的时候,会从对应的链表中,遍历所有注册的cb,并调用回调。 ++ * 全局使用同一个pair,所有的loop都注册监听pair[0],当有事件写入的时候,就判断是否需要处理。 ++ * ++ * */ ++hsig_t* hsig_add(hloop_t* loop, hsig_cb cb, uint32_t sig) ++{ ++ printf("hsig_add sig=%d\n", sig); ++ ++ hsig_t* signal; ++ HV_ALLOC_SIZEOF(signal); ++ struct list_head *events_at_sig = NULL; ++ struct sigaction sa; ++ ++ if (-1 == sig_write_fd) { ++ socketpair(AF_UNIX, SOCK_STREAM, 0, pair); ++ fcntl(pair[0], F_SETFL, O_NONBLOCK); ++ fcntl(pair[1], F_SETFL, O_NONBLOCK); ++ fcntl(pair[0], F_SETFD, FD_CLOEXEC); ++ fcntl(pair[1], F_SETFD, FD_CLOEXEC); ++ sig_write_fd = pair[1]; ++ ++ memset(sig_init, 0, sizeof(sig_init)); ++ } ++ // 每个信号量只注册一次 ++ if (false == sig_init[sig]) { ++ memset(&sa, 0, sizeof(sa)); ++ sa.sa_handler = sig_handler; ++ sa.sa_flags |= SA_RESTART; ++ sigfillset(&sa.sa_mask); ++ sigaction(sig, &sa, NULL); ++ sig_init[sig] = true; ++ } ++ ++ if (loop->enable_signal == 0) { ++ printf("Enable the signal\n"); ++ loop->enable_signal = 1; ++ for (int i = 0; i < _NSIG; i++) { ++ list_init(&(loop->signal_events_head[i])); ++ } ++ list_init(&(loop->awaken_signal_events_head)); ++ loop->signal_io_watcher = hio_get(loop, pair[0]); ++ hio_set_context(loop->signal_io_watcher, loop); ++ hio_add(loop->signal_io_watcher, (hio_cb)sig_event_cb, HV_READ); ++ } ++ events_at_sig = &(loop->signal_events_head[sig]); ++ signal->loop = loop; ++ signal->event_type = HEVENT_TYPE_SIGNAL; ++ signal->priority = HEVENT_LOWEST_PRIORITY; ++ list_add(&signal->self_signal_node, events_at_sig); ++ EVENT_ADD(loop, signal, cb); ++ loop->nsignals++; ++ ++ return signal; ++} ++ ++static void __hsig_del(hsig_t* signal) ++{ ++ if (signal->destroy) { ++ return; ++ } ++ signal->destroy = 1; ++ list_del(&signal->self_signal_node); ++ signal->loop->nsignals--; ++} ++ ++void hsig_del(hsig_t* signal) ++{ ++ if (!signal->active) { ++ return; ++ } ++ __hsig_del(signal); ++ EVENT_DEL(signal); ++} +\ No newline at end of file +diff --git a/unittest/CMakeLists.txt b/unittest/CMakeLists.txt +index 4e469e6..269aa2e 100644 +--- a/unittest/CMakeLists.txt ++++ b/unittest/CMakeLists.txt +@@ -86,6 +86,12 @@ target_include_directories(ftp PRIVATE .. ../base ../protocol) + add_executable(sendmail sendmail_test.c ../protocol/smtp.c ../base/hsocket.c ../util/base64.c) + target_include_directories(sendmail PRIVATE .. ../base ../protocol ../util) + ++# ------event------ ++add_executable(event_signal event_signal_test.c ../event/hevent.c) ++target_link_libraries(event_signal ${HV_LIBRARIES}) ++target_compile_definitions(event_signal PRIVATE -DPRINT_DEBUG) ++target_include_directories(event_signal PRIVATE ../event ../base ../ssl) ++ + if(UNIX) + add_executable(webbench webbench.c) + endif() +diff --git a/unittest/event_signal_test.c b/unittest/event_signal_test.c +new file mode 100755 +index 0000000..e5ffaa2 +--- /dev/null ++++ b/unittest/event_signal_test.c +@@ -0,0 +1,114 @@ ++#include ++#include ++#include ++#include ++#include "hloop.h" ++#include "hevent.h" ++ ++static int callback_triggered = 0; ++static int exit_main = 0; ++ ++// 线程函数,用于修改全局变量 ++void *thread_function(void *arg) { ++ printf("loop run.\n"); ++ hloop_run((hloop_t*)arg); ++ ++ return NULL; ++} ++ ++void signal_handler(hsig_t* signal) { ++ printf("Signal %d received.\n", signal->signal); ++ // 在这里执行你的回调操作 ++ // 例如,设置标志以指示回调已触发 ++ callback_triggered += 1; ++} ++ ++void timeout_handler(htimer_t* timer) { ++ printf("timeout received.\n"); ++ exit_main = 1; ++} ++ ++void test1() ++{ ++ // 注册信号处理函数 ++ hloop_t* loop = hloop_new(HLOOP_FLAG_AUTO_FREE); ++ hsig_t* signal = hsig_add(loop, signal_handler, SIGUSR1); ++ hsig_add(loop, signal_handler, SIGUSR2); ++ pthread_t thread_id; ++ ++ // 创建线程 ++ if (pthread_create(&thread_id, NULL, thread_function, loop) != 0) { ++ perror("pthread_create"); ++ exit(1); ++ } ++ ++ // 发送信号(这里使用 SIGUSR1 作为示例) ++ pid_t pid = getpid(); // 获取当前进程的进程ID ++ printf("Sending signal SIGUSR1 to process %d...\n", pid); ++ ++ kill(pid, SIGUSR1); ++ // 等待一段时间以确保回调有足够的时间执行 ++ sleep(1); ++ ++ kill(pid, SIGUSR2); ++ // 等待一段时间以确保回调有足够的时间执行 ++ sleep(1); ++ // 检查回调是否已触发 ++ if (2 == callback_triggered) { ++ printf("Callback was triggered successfully.\n"); ++ } else { ++ printf("Callback did not trigger.\n"); ++ } ++} ++ ++void test2() ++{ ++ callback_triggered = 0; ++ // 注册信号处理函数 ++ hloop_t* loop = hloop_new(HLOOP_FLAG_AUTO_FREE); ++ hsig_t* signal = hsig_add(loop, signal_handler, SIGUSR1); ++ hsig_add(loop, signal_handler, SIGUSR2); ++ pthread_t thread_id; ++ int status; ++ ++ // 创建线程 ++ if (pthread_create(&thread_id, NULL, thread_function, loop) != 0) { ++ perror("pthread_create"); ++ exit(1); ++ } ++ ++ // 创建线程 ++ pid_t pid = 0; ++ pid = fork(); ++ if (pid < 0) ++ return 1; ++ else if (pid == 0) { ++ printf("sub process continue.\n"); ++ usleep(10000); /* 0.01 seconds */ ++ kill(getppid(), SIGUSR1); ++ usleep(10000); /* 0.01 seconds */ ++ kill(getppid(), SIGUSR1); ++ usleep(10000); /* 0.01 seconds */ ++ kill(getppid(), SIGUSR2); ++ printf("sub process exit.\n"); ++ } ++ else { ++ printf("Main process continue.\n"); ++ (void)htimer_add(loop, timeout_handler, 5000, INFINITE); ++ while (!exit_main) { ++ sleep(1); ++ } ++ // 检查回调是否已触发 ++ if (3 == callback_triggered) { ++ printf("Callback was triggered successfully.\n"); ++ } else { ++ printf("Callback did not trigger.\n"); ++ } ++ } ++} ++ ++int main() { ++ test1(); ++ test2(); ++ return 0; ++} +-- +2.33.0 + -- Gitee