diff --git a/include/uv.h b/include/uv.h index f8c422dfe3b5052f1fcabfe2af220395e0bb464e..711e96fb15d8a39c9ad4079d99e0a67996f40732 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1829,6 +1829,22 @@ union uv_any_req { }; #undef XX +enum TaskPriority { + TASK_HIGH, + TASK_NORMAL, + TASK_LOW, +}; + +typedef void (*uv_io_cb)(void* data, int status); +typedef void (*uv_post_task)(void *handler, uv_io_cb func, void *data, int status, int prio); + +#define UV_EVENT_MAGIC 0x200A200A + +struct uv_loop_data { + unsigned int loop_data_magic; + void *event_handler; + uv_post_task post_task_func; +}; struct uv_loop_s { /* User data - use this for whatever. */ diff --git a/src/threadpool.c b/src/threadpool.c index 96e08dc32574df6515ea68b1cdab8c1310866300..d7f159ddc5db270912d073474debb00aefac9033 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -51,6 +51,20 @@ static QUEUE wq; static QUEUE run_slow_work_message; static QUEUE slow_io_pending_wq; +static int check_data_valid(struct uv_loop_data *data) { + if (data == NULL) { + return -1; + } + if (data->loop_data_magic != UV_EVENT_MAGIC) { + return -1; + } + if (data->post_task_func == NULL) { + UV_LOGE("post_task_func is NULL"); + return -1; + } + return 0; +} + #ifdef UV_STATISTIC #define MAX_DUMP_QUEUE_SIZE 200 static uv_mutex_t dump_queue_mutex; @@ -405,8 +419,14 @@ static void worker(void* arg) { uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ - QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); - uv_async_send(&w->loop->wq_async); + + struct uv_loop_data *data = (struct uv_loop_data*)(w->loop->data); + if (check_data_valid(data) == 0) { + data->post_task_func(data->event_handler, w->done, w, 0, -1); + } else { + QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); + uv_async_send(&w->loop->wq_async); + } uv_mutex_unlock(&w->loop->wq_mutex); /* Lock `mutex` since that is expected at the start of the next @@ -605,7 +625,14 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { int qos = (ffrt_qos_t)(intptr_t)req->reserved[0]; QUEUE_INSERT_TAIL(&(lfields->wq_sub[qos]), &w->wq); #endif - uv_async_send(&loop->wq_async); + + struct uv_loop_data *data = (struct uv_loop_data*)(w->loop->data); + if (check_data_valid(data) == 0) { + data->post_task_func(data->event_handler, w->done, w, UV_ECANCELED, qos); + } else { + uv_async_send(&loop->wq_async); + } + uv_mutex_unlock(&loop->wq_mutex); rdunlock_closed_uv_loop_rwlock(); @@ -728,8 +755,15 @@ void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos) uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ - QUEUE_INSERT_TAIL(&(lfields->wq_sub[qos]), &w->wq); - uv_async_send(&w->loop->wq_async); + + struct uv_loop_data *data = (struct uv_loop_data*)(w->loop->data); + if (check_data_valid(data) == 0) { + data->post_task_func(data->event_handler, w->done, w, 0, qos); + } else { + QUEUE_INSERT_TAIL(&(lfields->wq_sub[qos]), &w->wq); + uv_async_send(&w->loop->wq_async); + } + uv_mutex_unlock(&w->loop->wq_mutex); rdunlock_closed_uv_loop_rwlock(); }