diff --git a/include/uv.h b/include/uv.h index 39d699bef4186a6163f681e7df48cf17caa122de..442c6d758fc79bc47ce2005931a015901453e610 100644 --- a/include/uv.h +++ b/include/uv.h @@ -442,6 +442,12 @@ UV_EXTERN const char* uv_err_name(int err); UV_EXTERN char* uv_err_name_r(int err, char* buf, size_t buflen); +/* + * reserved[0] is used for ffrt qos + * reserved[1] is used to record task name + * reserved[2] is used for ffrt task handle + * reserved[3] is used to collect asynchronous task stack + */ #define UV_REQ_FIELDS \ /* public */ \ void* data; \ @@ -1212,6 +1218,19 @@ UV_EXTERN int uv_queue_work_with_qos_internal(uv_loop_t* loop, uv_qos_t qos, const char* task_name); +/* + * Ensures order preservation for asynchronous tasks sharing the same task ID + * + * This function guarantees in-order execution of asynchronous tasks by enqueuing them in an event loop's work queue. + * When multiple tasks share an identical 'taskId', they will execute sequentially in submission order. + */ +UV_EXTERN int uv_queue_work_ordered(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + uv_qos_t qos, + uint64_t taskId); + struct uv_cpu_times_s { uint64_t user; /* milliseconds */ uint64_t nice; /* milliseconds */ diff --git a/src/threadpool.c b/src/threadpool.c index 7f3d72b8268dd34eed9d794266f54bf0e8254740..79714dc975300d2473125ff349a6fa9f3a48a2bb 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -48,6 +48,17 @@ #define CURSOR 5 #endif +typedef enum { + /* ffrt qos */ + FFRT_QOS = 0, + /* record task name */ + DFX_TASK_NAME, + /* ffrt task handle */ + FFRT_TASK_DEPENDENCE, + /* collect asynchronous task stack */ + DFX_ASYNC_STACK, +} req_reversed; + #ifdef USE_FFRT static uv_rwlock_t g_closed_uv_loop_rwlock; #endif @@ -363,6 +374,13 @@ static void uv__print_active_reqs(uv_loop_t* loop, const char* flag) { #ifdef USE_FFRT +static void uv__req_reserved_init(uv_work_t* req) { + for (int i = 0; i < sizeof(req->reserved) / sizeof(req->reserved[0]); i++) { + req->reserved[i] = NULL; + } +} + + static void uv__task_done_wrapper(void* work, int status) { struct uv__work* w = (struct uv__work*)work; uv__print_active_reqs(w->loop, "complete"); @@ -388,7 +406,7 @@ void uv__work_submit_to_eventloop(uv_req_t* req, struct uv__work* w, int qos) { struct uv_loop_data* data = (struct uv_loop_data*)loop->data; uv_mutex_unlock(&loop->wq_mutex); if (req->type == UV_WORK) { - data->post_task_func((char*)req->reserved[1], uv__task_done_wrapper, (void*)w, status, qos); + data->post_task_func((char*)req->reserved[DFX_TASK_NAME], uv__task_done_wrapper, (void*)w, status, qos); } else { data->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos); } @@ -429,8 +447,13 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { uv_mutex_unlock(&mutex); #else uv_mutex_lock(&w->loop->wq_mutex); - cancelled = !uv__queue_empty(&w->wq) && w->work != NULL - && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]); + if (req->reserved[FFRT_TASK_DEPENDENCE] == NULL) { + cancelled = !uv__queue_empty(&w->wq) && w->work != NULL + && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[FFRT_QOS]); + } else { + cancelled = w->work != NULL && (ffrt_skip((ffrt_task_handle_t)req->reserved[FFRT_TASK_DEPENDENCE]) == 0); + } + uv_mutex_unlock(&w->loop->wq_mutex); #endif @@ -448,13 +471,13 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { uv_async_send(&loop->wq_async); #else uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop); - int qos = (ffrt_qos_t)(intptr_t)req->reserved[0]; + int qos = (ffrt_qos_t)(intptr_t)req->reserved[FFRT_QOS]; if (uv_check_data_valid(w->loop) == 0) { int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0; struct uv_loop_data* data = (struct uv_loop_data*)w->loop->data; if (req->type == UV_WORK) { - data->post_task_func((char*)req->reserved[1], uv__task_done_wrapper, (void*)w, status, qos); + data->post_task_func((char*)req->reserved[DFX_TASK_NAME], uv__task_done_wrapper, (void*)w, status, qos); } else { data->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos); } @@ -551,7 +574,7 @@ static void uv__queue_work(struct uv__work* w) { #endif uv_work_t* req = container_of(w, uv_work_t, work_req); #ifdef ASYNC_STACKTRACE - LibuvSetStackId((uint64_t)req->reserved[3]); + LibuvSetStackId((uint64_t)req->reserved[DFX_ASYNC_STACK]); #endif req->work_cb(req); #ifdef USE_FFRT @@ -570,16 +593,20 @@ static void uv__queue_done(struct uv__work* w, int err) { req = container_of(w, uv_work_t, work_req); #ifdef ASYNC_STACKTRACE - LibuvSetStackId((uint64_t)req->reserved[3]); + LibuvSetStackId((uint64_t)req->reserved[DFX_ASYNC_STACK]); #endif uv__req_unregister(req->loop, req); if (req->after_work_cb == NULL) return; #ifdef USE_FFRT - if (req->reserved[1] != NULL) { - free((char*)req->reserved[1]); - req->reserved[1] = NULL; + if (req->reserved[DFX_TASK_NAME] != NULL) { + free((char*)req->reserved[DFX_TASK_NAME]); + req->reserved[DFX_TASK_NAME] = NULL; + } + if (req->reserved[FFRT_TASK_DEPENDENCE] != NULL) { + ffrt_task_handle_destroy((ffrt_task_handle_t)req->reserved[FFRT_TASK_DEPENDENCE]); + req->reserved[FFRT_TASK_DEPENDENCE] = NULL; } #endif req->after_work_cb(req, err); @@ -587,6 +614,25 @@ static void uv__queue_done(struct uv__work* w, int err) { #ifdef USE_FFRT +struct ffrt_function { + ffrt_function_header_t header; + struct uv__work* w; + int qos; +}; + +void uv__ffrt_work_ordered(void* t) { + ffrt_this_task_set_legacy_mode(true); + struct ffrt_function* f = (struct ffrt_function*)t; + if (f == NULL || f->w == NULL || f->w->work == NULL) { + UV_LOGE("uv work is invalid"); + ffrt_this_task_set_legacy_mode(false); + return; + } + f->w->work(f->w, f->qos); + ffrt_this_task_set_legacy_mode(false); +} + + void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos) { struct uv__work* w = (struct uv__work *)data; @@ -637,7 +683,7 @@ void uv__work_submit(uv_loop_t* loop, w->work = work; w->done = done; - req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr); + req->reserved[FFRT_QOS] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr); ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr); ffrt_task_attr_destroy(&attr); } @@ -659,10 +705,49 @@ void uv__work_submit_with_qos(uv_loop_t* loop, w->work = work; w->done = done; - req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr); + req->reserved[FFRT_QOS] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr); ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr); ffrt_task_attr_destroy(&attr); } + + +/* ffrt uv__work_submit_ordered */ +void uv__work_submit_ordered(uv_loop_t* loop, + uv_req_t* req, + struct uv__work* w, + ffrt_qos_t qos, + void (*work)(struct uv__work *w, int qos), + void (*done)(struct uv__work *w, int status), + uint64_t taskId) { + uv_once(&once, init_once); + ffrt_task_attr_t attr; + ffrt_task_attr_init(&attr); + ffrt_task_attr_set_qos(&attr, qos); + + w->loop = loop; + w->work = work; + w->done = done; + + req->reserved[FFRT_QOS] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr); + struct ffrt_function* f = + (struct ffrt_function*)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_general); + f->header.exec = uv__ffrt_work_ordered; + f->header.destroy = NULL; + f->w = w; + f->qos = qos; + ffrt_dependence_t dependence; + dependence.type = ffrt_dependence_data; + dependence.ptr = (void*)taskId; + ffrt_deps_t out_deps; + out_deps.len = 1; + out_deps.items = &dependence; + ffrt_task_handle_t handle = ffrt_submit_h_base((ffrt_function_header_t*)f, NULL, &out_deps, &attr); + if (handle == NULL) { + UV_LOGE("submit task failed"); + } + req->reserved[FFRT_TASK_DEPENDENCE] = (void*)handle; + ffrt_task_attr_destroy(&attr); +} #endif @@ -673,7 +758,7 @@ int uv_queue_work(uv_loop_t* loop, if (work_cb == NULL) return UV_EINVAL; #ifdef USE_FFRT - req->reserved[1] = NULL; + uv__req_reserved_init(req); #endif uv__print_active_reqs(loop, "execute"); uv__req_init(loop, req, UV_WORK); @@ -682,8 +767,8 @@ int uv_queue_work(uv_loop_t* loop, req->after_work_cb = after_work_cb; #ifdef ASYNC_STACKTRACE - /* The req->reserved[3] is used for DFX only. */ - req->reserved[3] = (void*)LibuvCollectAsyncStack(); + /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */ + req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack(); #endif uv__work_submit(loop, #ifdef USE_FFRT @@ -707,10 +792,8 @@ int uv_queue_work_internal(uv_loop_t* loop, if (work_cb == NULL) return UV_EINVAL; - int ret = uv__copy_taskname((uv_req_t*)req, task_name); - if (ret != 0) { - req->reserved[1] = NULL; - } + uv__req_reserved_init(req); + uv__copy_taskname((uv_req_t*)req, task_name); uv__print_active_reqs(loop, "execute"); uv__req_init(loop, req, UV_WORK); @@ -719,8 +802,8 @@ int uv_queue_work_internal(uv_loop_t* loop, req->after_work_cb = after_work_cb; #ifdef ASYNC_STACKTRACE - /* The req->reserved[3] is used for DFX only. */ - req->reserved[3] = (void*)LibuvCollectAsyncStack(); + /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */ + req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack(); #endif uv__work_submit(loop, (uv_req_t*)req, @@ -744,7 +827,7 @@ int uv_queue_work_with_qos(uv_loop_t* loop, if (work_cb == NULL) return UV_EINVAL; - req->reserved[1] = NULL; + uv__req_reserved_init(req); STATIC_ASSERT(uv_qos_background == ffrt_qos_background); STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility); STATIC_ASSERT(uv_qos_default == ffrt_qos_default); @@ -765,8 +848,8 @@ int uv_queue_work_with_qos(uv_loop_t* loop, req->after_work_cb = after_work_cb; #ifdef ASYNC_STACKTRACE - /* The req->reserved[3] is used for DFX only. */ - req->reserved[3] = (void*)LibuvCollectAsyncStack(); + /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */ + req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack(); #endif uv__work_submit_with_qos(loop, (uv_req_t*)req, @@ -791,10 +874,8 @@ int uv_queue_work_with_qos_internal(uv_loop_t* loop, if (work_cb == NULL) return UV_EINVAL; - int ret = uv__copy_taskname((uv_req_t*)req, task_name); - if (ret != 0) { - req->reserved[1] = NULL; - } + uv__req_reserved_init(req); + uv__copy_taskname((uv_req_t*)req, task_name); STATIC_ASSERT(uv_qos_background == ffrt_qos_background); STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility); @@ -816,8 +897,8 @@ int uv_queue_work_with_qos_internal(uv_loop_t* loop, req->after_work_cb = after_work_cb; #ifdef ASYNC_STACKTRACE - /* The req->reserved[3] is used for DFX only. */ - req->reserved[3] = (void*)LibuvCollectAsyncStack(); + /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */ + req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack(); #endif uv__work_submit_with_qos(loop, (uv_req_t*)req, @@ -832,6 +913,55 @@ int uv_queue_work_with_qos_internal(uv_loop_t* loop, } +int uv_queue_work_ordered(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + uv_qos_t qos, + uint64_t taskId) { +#ifdef USE_FFRT + if (work_cb == NULL) + return UV_EINVAL; + + uv__req_reserved_init(req); + + STATIC_ASSERT(uv_qos_background == ffrt_qos_background); + STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility); + STATIC_ASSERT(uv_qos_default == ffrt_qos_default); + STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated); + STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_user_interactive); + if (qos == uv_qos_reserved) { + UV_LOGW("Invalid qos %{public}d", (int)qos); + return UV_EINVAL; + } + if (qos < ffrt_qos_background || qos > ffrt_qos_user_interactive) { + return UV_EINVAL; + } + + uv__print_active_reqs(loop, "execute"); + uv__req_init(loop, req, UV_WORK); + req->loop = loop; + req->work_cb = work_cb; + req->after_work_cb = after_work_cb; + +#ifdef ASYNC_STACKTRACE + /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */ + req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack(); +#endif + uv__work_submit_ordered(loop, + (uv_req_t*)req, + &req->work_req, + (ffrt_qos_t)qos, + uv__queue_work, + uv__queue_done, + taskId); + return 0; +#else + return uv_queue_work_with_qos(loop, req, work_cb, after_work_cb, qos); +#endif +} + + int uv_cancel(uv_req_t* req) { struct uv__work* wreq; uv_loop_t* loop;