diff --git a/BUILD.gn b/BUILD.gn index 97453d088fbb1ea56b19247a7ea362ce9e75dbdd..14cd9a5dd2c580f49a457429001758ef75513b35 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -186,7 +186,6 @@ if (defined(ohos_lite)) { } if (enable_uv_statisic && is_ohos) { - defines += [ "UV_STATISTIC" ] cflags += [ "-Wno-frame-address" ] # for use of __builtin_return_address } diff --git a/include/uv.h b/include/uv.h index 0d39684b8e71dca3ea92a16321fa7e480a295c0a..8c6631dad66a84beeac41200f771bde8dab71f82 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1180,6 +1180,13 @@ UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_cb work_cb, uv_after_work_cb after_work_cb); +/* This function is used for OpenHarmony only. */ +UV_EXTERN int uv_queue_work_internal(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + const char* task_name); + UV_EXTERN int uv_cancel(uv_req_t* req); typedef enum { @@ -1196,6 +1203,14 @@ UV_EXTERN int uv_queue_work_with_qos(uv_loop_t* loop, uv_after_work_cb after_work_cb, uv_qos_t qos); +/* This function is used for OpenHarmony only. */ +UV_EXTERN int uv_queue_work_with_qos_internal(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + uv_qos_t qos, + const char* task_name); + struct uv_cpu_times_s { uint64_t user; /* milliseconds */ uint64_t nice; /* milliseconds */ @@ -1926,7 +1941,7 @@ union uv_any_req { #undef XX typedef void (*uv_io_cb)(void* work, int status); -typedef void (*uv_post_task)(void* handler, uv_io_cb func, void* work, int status, int prio); +typedef void (*uv_post_task)(const char* task_name, uv_io_cb func, void* work, int status, int prio); struct uv_loop_data { void* event_handler; diff --git a/include/uv/threadpool.h b/include/uv/threadpool.h index 190d20570daaa07b453de1006b2c9dc5bb3f9bb3..777ab18f2d921671d9c63a075a433b1f7295e0b9 100644 --- a/include/uv/threadpool.h +++ b/include/uv/threadpool.h @@ -27,49 +27,16 @@ #ifndef UV_THREADPOOL_H_ #define UV_THREADPOOL_H_ -#ifdef UV_STATISTIC -enum uv_work_state { - WAITING = 0, - WORK_EXECUTING, - WORK_END, - DONE_EXECUTING, - DONE_END, -}; - -/* used for dump uv work infomation */ -struct uv_work_dump_info { - uint64_t queue_time; - void* builtin_return_address[3]; // backtrace the caller. - - enum uv_work_state state; - - uint64_t execute_start_time; - uint64_t execute_end_time; - uint64_t done_start_time; - uint64_t done_end_time; - - struct uv__work* work; - - struct uv__queue wq; -}; - -struct uv__statistic_work { - void (*work)(struct uv__statistic_work *w); - struct uv_work_dump_info* info; - enum uv_work_state state; - uint64_t time; - struct uv__queue wq; -}; -#endif struct uv__work { +#ifdef USE_FFRT + void (*work)(struct uv__work *w, int qos); +#else void (*work)(struct uv__work *w); +#endif void (*done)(struct uv__work *w, int status); struct uv_loop_s* loop; struct uv__queue wq; -#ifdef UV_STATISTIC - struct uv_work_dump_info* info; -#endif }; #endif /* UV_THREADPOOL_H_ */ diff --git a/src/random.c b/src/random.c index a6917ca39b31e402390f45527923b15907f9f328..a5bd1f50b41ea6c60571caab2e33b410464156ae 100644 --- a/src/random.c +++ b/src/random.c @@ -70,11 +70,18 @@ static int uv__random(void* buf, size_t buflen) { } +#ifdef USE_FFRT +static void uv__random_work(struct uv__work* w, int qos) { +#else static void uv__random_work(struct uv__work* w) { +#endif uv_random_t* req; req = container_of(w, uv_random_t, work_req); req->status = uv__random(req->buf, req->buflen); +#ifdef USE_FFRT + uv__work_submit_to_eventloop((uv_req_t*)req, w, qos); +#endif } diff --git a/src/threadpool.c b/src/threadpool.c index ee86f7b1340f8bf93ff472f74b92dc445bab9f82..78dce85e0895d60dcc7611cbe92459760bcc3472 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -31,6 +31,7 @@ #include #ifdef USE_FFRT +#define REQ_MASK 0x1111 #include #include "ffrt_inner.h" #endif @@ -62,234 +63,6 @@ static struct uv__queue run_slow_work_message; static struct uv__queue slow_io_pending_wq; -#ifdef UV_STATISTIC -#define MAX_DUMP_QUEUE_SIZE 200 -static uv_mutex_t dump_queue_mutex; -static QUEUE dump_queue; -static unsigned int dump_queue_size; - -static int statistic_idle; -static uv_mutex_t statistic_mutex; -static QUEUE statistic_works; -static uv_cond_t dump_cond; -static uv_thread_t dump_thread; - -static void uv_dump_worker(void* arg) { - struct uv__statistic_work* w; - struct uv__queue* q; - uv_sem_post((uv_sem_t*) arg); - arg = NULL; - uv_mutex_lock(&statistic_mutex); - for (;;) { - while (uv__queue_empty(&statistic_works)) { - statistic_idle = 1; - uv_cond_wait(&dump_cond, &statistic_mutex); - statistic_idle = 0; - } - q = uv__queue_head(&statistic_works); - if (q == &exit_message) { - uv_cond_signal(&dump_cond); - uv_mutex_unlock(&statistic_mutex); - break; - } - uv__queue_remove(q); - uv__queue_init(q); - uv_mutex_unlock(&statistic_mutex); - w = uv__queue_data(q, struct uv__statistic_work, wq); - w->work(w); - free(w); - uv_mutex_lock(&statistic_mutex); - } -} - -static void post_statistic_work(struct uv__queue* q) { - uv_mutex_lock(&statistic_mutex); - uv__queue_insert_tail(&statistic_works, q); - if (statistic_idle) - uv_cond_signal(&dump_cond); - uv_mutex_unlock(&statistic_mutex); -} - -static void uv__queue_work_info(struct uv__statistic_work *work) { - uv_mutex_lock(&dump_queue_mutex); - if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */ - struct uv__queue* q; - uv__queue_foreach(q, &dump_queue) { - struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq); - if (info->state == DONE_END) { - uv__queue_remove(q); - free(info); - dump_queue_size--; - } - } - if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { - abort(); /* too many works not done. */ - } - } - - uv__queue_insert_head(&dump_queue, &work->info->wq); - dump_queue_size++; - uv_mutex_unlock(&dump_queue_mutex); -} - -static void uv__update_work_info(struct uv__statistic_work *work) { - uv_mutex_lock(&dump_queue_mutex); - if (work != NULL && work->info != NULL) { - work->info->state = work->state; - switch (work->state) { - case WAITING: - work->info->queue_time = work->time; - break; - case WORK_EXECUTING: - work->info->execute_start_time = work->time; - break; - case WORK_END: - work->info->execute_end_time = work->time; - break; - case DONE_EXECUTING: - work->info->done_start_time = work->time; - break; - case DONE_END: - work->info->done_end_time = work->time; - break; - default: - break; - } - } - uv_mutex_unlock(&dump_queue_mutex); -} - - -// return the timestamp in millisecond -static uint64_t uv__now_timestamp() { - uv_timeval64_t tv; - int r = uv_gettimeofday(&tv); - if (r != 0) { - return 0; - } - return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; -} - - -static void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) { - struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work)); - if (dump_work == NULL) { - return; - } - dump_work->info = w->info; - dump_work->work = uv__update_work_info; - dump_work->time = uv__now_timestamp(); - dump_work->state = state; - uv__queue_init(&dump_work->wq); - post_statistic_work(&dump_work->wq); -} - - -static void init_work_dump_queue() -{ - if (uv_mutex_init(&dump_queue_mutex)) - abort(); - uv_mutex_lock(&dump_queue_mutex); - uv__queue_init(&dump_queue); - dump_queue_size = 0; - uv_mutex_unlock(&dump_queue_mutex); - - /* init dump thread */ - statistic_idle = 1; - if (uv_mutex_init(&statistic_mutex)) - abort(); - uv__queue_init(&statistic_works); - uv_sem_t sem; - if (uv_cond_init(&dump_cond)) - abort(); - if (uv_sem_init(&sem, 0)) - abort(); - if (uv_thread_create(&dump_thread, uv_dump_worker, &sem)) - abort(); - uv_sem_wait(&sem); - uv_sem_destroy(&sem); -} - - -void uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) { - if (info == NULL) - return; - info->queue_time = 0; - info->state = WAITING; - info->execute_start_time = 0; - info->execute_end_time = 0; - info->done_start_time = 0; - info->done_end_time = 0; - info->work = w; - uv__queue_init(&info->wq); -} - - -void uv_queue_statics(struct uv_work_dump_info* info) { - struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work)); - if (dump_work == NULL) { - abort(); - } - dump_work->info = info; - dump_work->work = uv__queue_work_info; - info->queue_time = uv__now_timestamp(); - dump_work->state = WAITING; - uv__queue_init(&dump_work->wq); - post_statistic_work(&dump_work->wq); -} - - -uv_worker_info_t* uv_dump_work_queue(int* size) { -#ifdef UV_STATISTIC - uv_mutex_lock(&dump_queue_mutex); - if (uv__queue_empty(&dump_queue)) { - return NULL; - } - *size = dump_queue_size; - uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size); - struct uv__queue* q; - int i = 0; - uv__queue_foreach(q, &dump_queue) { - struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq); - dump_info[i].queue_time = info->queue_time; - dump_info[i].builtin_return_address[0] = info->builtin_return_address[0]; - dump_info[i].builtin_return_address[1] = info->builtin_return_address[1]; - dump_info[i].builtin_return_address[2] = info->builtin_return_address[2]; - switch (info->state) { - case WAITING: - strcpy(dump_info[i].state, "waiting"); - break; - case WORK_EXECUTING: - strcpy(dump_info[i].state, "work_executing"); - break; - case WORK_END: - strcpy(dump_info[i].state, "work_end"); - break; - case DONE_EXECUTING: - strcpy(dump_info[i].state, "done_executing"); - break; - case DONE_END: - strcpy(dump_info[i].state, "done_end"); - break; - default: - break; - } - dump_info[i].execute_start_time = info->execute_start_time; - dump_info[i].execute_end_time = info->execute_end_time; - dump_info[i].done_start_time = info->done_start_time; - dump_info[i].done_end_time = info->done_end_time; - ++i; - } - uv_mutex_unlock(&dump_queue_mutex); - return dump_info; -#else - size = 0; - return NULL; -#endif -} -#endif - - static void init_closed_uv_loop_rwlock_once(void) { uv_rwlock_init(&g_closed_uv_loop_rwlock); } @@ -329,7 +102,11 @@ void on_uv_loop_close(uv_loop_t* loop) { } +#ifdef USE_FFRT +static void uv__cancelled(struct uv__work* w, int qos) { +#else static void uv__cancelled(struct uv__work* w) { +#endif abort(); } @@ -410,13 +187,7 @@ static void worker(void* arg) { uv_mutex_unlock(&mutex); w = uv__queue_data(q, struct uv__work, wq); -#ifdef UV_STATISTIC - uv__post_statistic_work(w, WORK_EXECUTING); -#endif w->work(w); -#ifdef UV_STATISTIC - uv__post_statistic_work(w, WORK_END); -#endif uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ @@ -484,12 +255,6 @@ void uv__threadpool_cleanup(void) { threads = NULL; nthreads = 0; -#ifdef UV_STATISTIC - post_statistic_work(&exit_message); - uv_thread_join(dump_thread); - uv_mutex_destroy(&statistic_mutex); - uv_cond_destroy(&dump_cond); -#endif } @@ -563,9 +328,6 @@ static void init_once(void) { abort(); #endif init_closed_uv_loop_rwlock_once(); -#ifdef UV_STATISTIC - init_work_dump_queue(); -#endif init_threads(); } @@ -603,6 +365,38 @@ static void uv__task_done_wrapper(void* work, int status) { uv__print_active_reqs(w->loop, "complete"); w->done(w, status); } + + +void uv__work_submit_to_eventloop(uv_req_t* req, struct uv__work* w, int qos) { + uv_loop_t* loop = w->loop; + rdlock_closed_uv_loop_rwlock(); + if (loop->magic != UV_LOOP_MAGIC) { + rdunlock_closed_uv_loop_rwlock(); + UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid", + (size_t)loop, loop->magic); + return; + } + + uv_mutex_lock(&loop->wq_mutex); + w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ + + if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) { + int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0; + struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data - + (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS)); + if (req->type == UV_WORK) { + addr->post_task_func((char*)req->reserved[1], uv__task_done_wrapper, (void*)w, status, qos); + } else { + addr->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos); + } + } else { + uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop); + uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq); + uv_async_send(&loop->wq_async); + } + uv_mutex_unlock(&loop->wq_mutex); + rdunlock_closed_uv_loop_rwlock(); +} #endif @@ -653,7 +447,11 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0; struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data - (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS)); - addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos); + if (req->type == UV_WORK) { + addr->post_task_func((char*)req->reserved[1], uv__task_done_wrapper, (void*)w, status, qos); + } else { + addr->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos); + } } else { uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq); uv_async_send(&loop->wq_async); @@ -709,24 +507,8 @@ void uv__work_done(uv_async_t* handle) { w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; -#ifdef UV_STATISTIC - uv__post_statistic_work(w, DONE_EXECUTING); - struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work)); - if (dump_work == NULL) { - UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno)); - break; - } - dump_work->info = w->info; - dump_work->work = uv__update_work_info; -#endif w->done(w, err); nevents++; -#ifdef UV_STATISTIC - dump_work->time = uv__now_timestamp(); - dump_work->state = DONE_END; - QUEUE_INIT(&dump_work->wq); - post_statistic_work(&dump_work->wq); -#endif } uv_end_trace(UV_TRACE_TAG); @@ -745,12 +527,19 @@ void uv__work_done(uv_async_t* handle) { } +#ifdef USE_FFRT +static void uv__queue_work(struct uv__work* w, int qos) { +#else static void uv__queue_work(struct uv__work* w) { +#endif uv_work_t* req = container_of(w, uv_work_t, work_req); #ifdef ASYNC_STACKTRACE LibuvSetStackId((uint64_t)req->reserved[3]); #endif req->work_cb(req); +#ifdef USE_FFRT + uv__work_submit_to_eventloop(req, w, qos); +#endif } @@ -770,7 +559,12 @@ static void uv__queue_done(struct uv__work* w, int err) { if (req->after_work_cb == NULL) return; - +#ifdef USE_FFRT + if (req->reserved[1] != NULL) { + free((char*)req->reserved[1]); + req->reserved[1] = NULL; + } +#endif req->after_work_cb(req, err); } @@ -779,46 +573,13 @@ static void uv__queue_done(struct uv__work* w, int err) { void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos) { struct uv__work* w = (struct uv__work *)data; - uv_loop_t* loop = w->loop; -#ifdef UV_STATISTIC - uv__post_statistic_work(w, WORK_EXECUTING); -#endif - w->work(w); -#ifdef UV_STATISTIC - uv__post_statistic_work(w, WORK_END); -#endif - rdlock_closed_uv_loop_rwlock(); - if (loop->magic != UV_LOOP_MAGIC) { - rdunlock_closed_uv_loop_rwlock(); - UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid", - (size_t)loop, loop->magic); - return; - } - - uv_mutex_lock(&loop->wq_mutex); - w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ - - if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) { - int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0; - struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data - - (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS)); - addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos); - } else { - uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop); - uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq); - uv_async_send(&loop->wq_async); - } - uv_mutex_unlock(&loop->wq_mutex); - rdunlock_closed_uv_loop_rwlock(); + w->work(w, (int)qos); } static void init_once(void) { init_closed_uv_loop_rwlock_once(); /* init uv work statics queue */ -#ifdef UV_STATISTIC - init_work_dump_queue(); -#endif ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task); } @@ -828,7 +589,7 @@ void uv__work_submit(uv_loop_t* loop, uv_req_t* req, struct uv__work* w, enum uv__work_kind kind, - void (*work)(struct uv__work *w), + void (*work)(struct uv__work *w, int qos), void (*done)(struct uv__work *w, int status)) { uv_once(&once, init_once); ffrt_task_attr_t attr; @@ -863,11 +624,11 @@ void uv__work_submit(uv_loop_t* loop, /* ffrt uv__work_submit */ void uv__work_submit_with_qos(uv_loop_t* loop, - uv_req_t* req, - struct uv__work* w, - ffrt_qos_t qos, - void (*work)(struct uv__work *w), - void (*done)(struct uv__work *w, int status)) { + uv_req_t* req, + struct uv__work* w, + ffrt_qos_t qos, + void (*work)(struct uv__work *w, int qos), + void (*done)(struct uv__work *w, int status)) { uv_once(&once, init_once); ffrt_task_attr_t attr; ffrt_task_attr_init(&attr); @@ -890,24 +651,17 @@ int uv_queue_work(uv_loop_t* loop, uv_after_work_cb after_work_cb) { if (work_cb == NULL) return UV_EINVAL; - +#ifdef USE_FFRT + if (req->type != REQ_MASK) { + req->reserved[1] = NULL; + } +#endif uv__print_active_reqs(loop, "execute"); uv__req_init(loop, req, UV_WORK); req->loop = loop; req->work_cb = work_cb; req->after_work_cb = after_work_cb; -#ifdef UV_STATISTIC - struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info)); - if (info == NULL) { - abort(); - } - uv_init_dump_info(info, &req->work_req); - info->builtin_return_address[0] = __builtin_return_address(0); - info->builtin_return_address[1] = __builtin_return_address(1); - info->builtin_return_address[2] = __builtin_return_address(2); - (req->work_req).info = info; -#endif #ifdef ASYNC_STACKTRACE req->reserved[3] = (void*)LibuvCollectAsyncStack(); #endif @@ -920,13 +674,23 @@ int uv_queue_work(uv_loop_t* loop, uv__queue_work, uv__queue_done ); -#ifdef UV_STATISTIC - uv_queue_statics(info); -#endif return 0; } +int uv_queue_work_internal(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + const char* task_name) { +#ifdef USE_FFRT + uv__copy_taskname(req, task_name); + req->type = (uv_req_type)REQ_MASK; +#endif + return uv_queue_work(loop, req, work_cb, after_work_cb); +} + + int uv_queue_work_with_qos(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, @@ -935,6 +699,9 @@ int uv_queue_work_with_qos(uv_loop_t* loop, #ifdef USE_FFRT if (work_cb == NULL) return UV_EINVAL; + if (req->type != REQ_MASK) { + req->reserved[1] = NULL; + } STATIC_ASSERT(uv_qos_background == ffrt_qos_background); STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility); @@ -950,26 +717,12 @@ int uv_queue_work_with_qos(uv_loop_t* loop, req->loop = loop; req->work_cb = work_cb; req->after_work_cb = after_work_cb; -#ifdef UV_STATISTIC - struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info)); - if (info == NULL) { - abort(); - } - uv_init_dump_info(info, &req->work_req); - info->builtin_return_address[0] = __builtin_return_address(0); - info->builtin_return_address[1] = __builtin_return_address(1); - info->builtin_return_address[2] = __builtin_return_address(2); - (req->work_req).info = info; -#endif uv__work_submit_with_qos(loop, (uv_req_t*)req, &req->work_req, (ffrt_qos_t)qos, uv__queue_work, uv__queue_done); -#ifdef UV_STATISTIC - uv_queue_statics(info); -#endif return 0; #else return uv_queue_work(loop, req, work_cb, after_work_cb); @@ -977,6 +730,20 @@ int uv_queue_work_with_qos(uv_loop_t* loop, } +int uv_queue_work_with_qos_internal(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + uv_qos_t qos, + const char* task_name) { +#ifdef USE_FFRT + uv__copy_taskname(req, task_name); + req->type = (uv_req_type)REQ_MASK; +#endif + return uv_queue_work_with_qos(loop, req, work_cb, after_work_cb, qos); +} + + int uv_cancel(uv_req_t* req) { struct uv__work* wreq; uv_loop_t* loop; diff --git a/src/unix/fs.c b/src/unix/fs.c index 484e40b459022f1645091ff2549efe1d72db91e9..95f194212c5a78c4b197f2959a775d3e5c82b1ce 100644 --- a/src/unix/fs.c +++ b/src/unix/fs.c @@ -160,7 +160,7 @@ extern char *mkdtemp(char *template); /* See issue #740 on AIX < 7 */ return 0; \ } \ else { \ - uv__fs_work(&req->work_req); \ + uv__fs_work(&req->work_req, -1); \ return req->result; \ } \ } \ @@ -1564,7 +1564,11 @@ static ssize_t uv__fs_write_all(uv_fs_t* req) { } +#ifdef USE_FFRT +static void uv__fs_work(struct uv__work* w, int qos) { +#else static void uv__fs_work(struct uv__work* w) { +#endif int retry_on_eintr; uv_fs_t* req; ssize_t r; @@ -1633,6 +1637,11 @@ static void uv__fs_work(struct uv__work* w) { req->fs_type == UV_FS_LSTAT)) { req->ptr = &req->statbuf; } +#ifdef USE_FFRT + if (qos != -1) { + uv__work_submit_to_eventloop((uv_req_t*)req, w, qos); + } +#endif } diff --git a/src/unix/getaddrinfo.c b/src/unix/getaddrinfo.c index 43984b17f2049478101766764453c3e694960ceb..a22079bb8210ac583c8d2661f75274a496edf7f6 100644 --- a/src/unix/getaddrinfo.c +++ b/src/unix/getaddrinfo.c @@ -95,13 +95,22 @@ int uv__getaddrinfo_translate_error(int sys_err) { } +#ifdef USE_FFRT +static void uv__getaddrinfo_work(struct uv__work* w, int qos) { +#else static void uv__getaddrinfo_work(struct uv__work* w) { +#endif uv_getaddrinfo_t* req; int err; req = container_of(w, uv_getaddrinfo_t, work_req); err = getaddrinfo(req->hostname, req->service, req->hints, &req->addrinfo); req->retcode = uv__getaddrinfo_translate_error(err); +#ifdef USE_FFRT + if (qos != -1) { + uv__work_submit_to_eventloop((uv_req_t*)req, w, qos); + } +#endif } @@ -213,7 +222,11 @@ int uv_getaddrinfo(uv_loop_t* loop, uv__getaddrinfo_done); return 0; } else { +#ifdef USE_FFRT + uv__getaddrinfo_work(&req->work_req, -1); +#else uv__getaddrinfo_work(&req->work_req); +#endif uv__getaddrinfo_done(&req->work_req, 0); return req->retcode; } diff --git a/src/unix/getnameinfo.c b/src/unix/getnameinfo.c index 43e5127d81799bb4cec221ad8e43c223577522da..36c837ebb1bce560ae84a9568682f8f80855f3aa 100644 --- a/src/unix/getnameinfo.c +++ b/src/unix/getnameinfo.c @@ -28,7 +28,11 @@ #include "internal.h" +#ifdef USE_FFRT +static void uv__getnameinfo_work(struct uv__work* w, int qos) { +#else static void uv__getnameinfo_work(struct uv__work* w) { +#endif uv_getnameinfo_t* req; int err; socklen_t salen; @@ -50,6 +54,11 @@ static void uv__getnameinfo_work(struct uv__work* w) { sizeof(req->service), req->flags); req->retcode = uv__getaddrinfo_translate_error(err); +#ifdef USE_FFRT + if (qos != -1) { + uv__work_submit_to_eventloop((uv_req_t*)req, w, qos); + } +#endif } static void uv__getnameinfo_done(struct uv__work* w, int status) { @@ -117,7 +126,11 @@ int uv_getnameinfo(uv_loop_t* loop, uv__getnameinfo_done); return 0; } else { +#ifdef USE_FFRT + uv__getnameinfo_work(&req->work_req, -1); +#else uv__getnameinfo_work(&req->work_req); +#endif uv__getnameinfo_done(&req->work_req, 0); return req->retcode; } diff --git a/src/uv-common.c b/src/uv-common.c index 1119350f639dd597307cd9725dab48f4fe75d5c4..4ea45dbd7cdba146f21d6868cfae99e7be7e99aa 100644 --- a/src/uv-common.c +++ b/src/uv-common.c @@ -1132,4 +1132,26 @@ void uv__multi_thread_check_unify(const uv_loop_t* loop, const char* funcName) { abort(); } } +#endif + +#ifdef USE_FFRT +int uv__copy_taskname(uv_req_t* req, const char* task_name) { + char* str = (char*)malloc(TASK_NAME_LENGTH); + if (str == NULL) { + UV_LOGE("malloc task name failed, task name:%{public}s", task_name); + return UV_EINVAL; + } + + char* pos_first = strchr(task_name, SPLIT_CHAR_FIRST); + char* pos_end = strchr(task_name, SPLIT_CHAR_SECOND); + pos_first = (pos_first == NULL ? (char*)task_name : pos_first + 1); + snprintf(str, TASK_NAME_LENGTH, "%s", pos_first); + int end = TASK_NAME_LENGTH - 1; + if (pos_end > pos_first && (pos_end - pos_first) < TASK_NAME_LENGTH - 1) { + end = pos_end - pos_first; + } + str[end] = '\0'; + req->reserved[1] = (void*)str; + return 0; +} #endif \ No newline at end of file diff --git a/src/uv-common.h b/src/uv-common.h index ac0bbb9ad1729493589479cf0cb0a1645193808c..0bc5238ee1ec5987cee7a7c2092907c903bac65c 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -215,7 +215,11 @@ void uv__work_submit(uv_loop_t* loop, #endif struct uv__work *w, enum uv__work_kind kind, +#ifdef USE_FFRT + void (*work)(struct uv__work *w, int qos), +#else void (*work)(struct uv__work *w), +#endif void (*done)(struct uv__work *w, int status)); void uv__work_done(uv_async_t* handle); @@ -466,4 +470,12 @@ void uv__init_thread_id(uv_loop_t* loop); void uv__set_thread_id(uv_loop_t* loop); void uv__multi_thread_check_unify(const uv_loop_t* loop, const char* funcName); #endif + +#ifdef USE_FFRT +#define SPLIT_CHAR_FIRST ':' +#define SPLIT_CHAR_SECOND '#' +#define TASK_NAME_LENGTH 64 +void uv__work_submit_to_eventloop(uv_req_t* req, struct uv__work* w, int qos); +int uv__copy_taskname(uv_req_t* req, const char* task_name); +#endif #endif /* UV_COMMON_H_ */