From 1d8bfe5dcf40c44dd2b241d2c92cbeb0f4f343a0 Mon Sep 17 00:00:00 2001 From: milkpotatoes Date: Mon, 16 Jun 2025 14:19:52 +0800 Subject: [PATCH] Fix not notify threads if queue is full in some case Issue: https://gitee.com/openharmony/arkui_napi/issues/ICFFKO Signed-off-by: milkpotatoes Change-Id: Iace4627ebcc84f8cb00e5c3c4920189d40732e15 --- native_engine/native_safe_async_work.cpp | 4 +- native_engine/native_safe_async_work.h | 1 + test/unittest/common/test_common.h | 3 + test/unittest/test_napi_threadsafe.cpp | 143 +++++++++++++++-------- 4 files changed, 99 insertions(+), 52 deletions(-) diff --git a/native_engine/native_safe_async_work.cpp b/native_engine/native_safe_async_work.cpp index dec9ac812..b88cb9fb7 100644 --- a/native_engine/native_safe_async_work.cpp +++ b/native_engine/native_safe_async_work.cpp @@ -189,9 +189,11 @@ SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCall if (IsMaxQueueSize()) { HILOG_INFO("queue size bigger than max queue size"); if (mode == NATIVE_TSFUNC_BLOCKING) { + waitingCount_++; while (IsMaxQueueSize()) { condition_.wait(lock); } + waitingCount_--; } else { return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL; } @@ -341,7 +343,7 @@ void NativeSafeAsyncWork::ProcessAsyncHandle() data = queue_.front(); // when queue is full, notify send. - if (size == maxQueueSize_ && maxQueueSize_ > 0) { + if (maxQueueSize_ > 0 && waitingCount_ > 0) { condition_.notify_one(); } diff --git a/native_engine/native_safe_async_work.h b/native_engine/native_safe_async_work.h index 0879981b1..fc9ddbee0 100644 --- a/native_engine/native_safe_async_work.h +++ b/native_engine/native_safe_async_work.h @@ -100,6 +100,7 @@ protected: NativeReference* ref_ = nullptr; size_t maxQueueSize_ = 0; size_t threadCount_ = 0; + size_t waitingCount_ { 0 }; void* finalizeData_ = nullptr; NativeFinalize finalizeCallback_ = nullptr; void* context_ = nullptr; diff --git a/test/unittest/common/test_common.h b/test/unittest/common/test_common.h index a6ee8d486..bf5d03138 100644 --- a/test/unittest/common/test_common.h +++ b/test/unittest/common/test_common.h @@ -15,8 +15,11 @@ #ifndef FOUNDATION_ACE_NAPI_TEST_UNITTEST_TEST_COMMON_H #define FOUNDATION_ACE_NAPI_TEST_UNITTEST_TEST_COMMON_H + #include "gtest/gtest.h" #include "utils/log.h" +#include "napi/native_api.h" + #define ASSERT_CHECK_CALL(call) \ { \ ASSERT_EQ(call, napi_ok); \ diff --git a/test/unittest/test_napi_threadsafe.cpp b/test/unittest/test_napi_threadsafe.cpp index 882b9fbfc..5e287acbf 100644 --- a/test/unittest/test_napi_threadsafe.cpp +++ b/test/unittest/test_napi_threadsafe.cpp @@ -13,9 +13,12 @@ * limitations under the License. */ +#include "engine/test.h" #include "test.h" #include +#include +#include #include #include @@ -76,10 +79,10 @@ static uv_thread_t g_uvThreadSecondary; static uv_thread_t g_uvTheads2; static uv_thread_t g_uvTheads3; static int32_t g_sendDatas[SEND_DATAS_LENGTH]; -static int32_t g_callSuccessCount = 0; -static int32_t g_callSuccessCountJS = 0; -static int32_t g_callSuccessCountJSFour = 0; -static int32_t g_callDepth = 4; +static int32_t g_callSuccessCount = 0; +static int32_t g_callSuccessCountJS = 0; +static int32_t g_callSuccessCountJSFour = 0; +static int32_t g_callDepth = 4; bool acquireFlag = false; static int32_t g_receiveCnt = 0; static bool g_isTailA = false; @@ -306,7 +309,6 @@ static void TsFuncDataSourceThreadCountTotal(void* data) // set send data g_sendData = SEND_DATA_TEST; if (acquireFlag) { - std::cout<<"acquireFlag is true"<(data); - napi_release_threadsafe_function(callbackData->tsfn, napi_tsfn_release); - return; -} - -static void finalizeCallBackThreadsafe014(napi_env env, void* finalizeData, void* hint) -{ - CallbackCountDataThreadsafe014 *callbackData = reinterpret_cast(finalizeData); - callbackData->tsfn = nullptr; -} - -static void executeWorkThreadsafe014(napi_env env, void *data) -{ - CallbackCountDataThreadsafe014 *callbackData = reinterpret_cast(data); - napi_call_threadsafe_function(callbackData->tsfn, (void *)callbackData, napi_tsfn_nonblocking); - std::this_thread::sleep_for(std::chrono::seconds(INT_TWO)); -} - HWTEST_F(NapiThreadsafeTest, ThreadsafeTest014, testing::ext::TestSize.Level1) { - HILOG_INFO("ThreadsafeTest014 start"); + struct CallbackCountData { + napi_threadsafe_function tsfn; + napi_async_work work; + std::mutex mutex; + std::condition_variable cond; + }; + + UVLoopRunner runner(engine_); napi_env env = (napi_env)engine_; - CallbackCountDataThreadsafe014 *callbackData = new CallbackCountDataThreadsafe014(); + CallbackCountData* callbackData = new CallbackCountData(); napi_value resourceName = 0; napi_create_string_latin1(env, __func__, NAPI_AUTO_LENGTH, &resourceName); - auto status = napi_create_threadsafe_function(env, nullptr, nullptr, resourceName, 0, 1, - callbackData, finalizeCallBackThreadsafe014, callbackData, callJSCallBackThreadsafe014, &callbackData->tsfn); + auto status = napi_create_threadsafe_function( + env, nullptr, nullptr, resourceName, 0, 1, callbackData, + [](napi_env env, void* finalizeData, void* hint) { + CallbackCountData* callbackData = + reinterpret_cast(finalizeData); + callbackData->tsfn = nullptr; + callbackData->cond.notify_one(); + }, + callbackData, + [](napi_env env, napi_value tsfn_cb, void* context, void* data) { + CallbackCountData* callbackData = reinterpret_cast(data); + napi_release_threadsafe_function(callbackData->tsfn, napi_tsfn_release); + }, + &callbackData->tsfn); EXPECT_EQ(status, napi_ok); - napi_create_async_work(env, nullptr, resourceName, executeWorkThreadsafe014, + napi_create_async_work( + env, nullptr, resourceName, + [](napi_env env, void* data) { + CallbackCountData* callbackData = reinterpret_cast(data); + std::unique_lock lock(callbackData->mutex); + napi_call_threadsafe_function(callbackData->tsfn, (void*)callbackData, napi_tsfn_nonblocking); + callbackData->cond.wait(lock); + }, [](napi_env env, napi_status status, void* data) { - CallbackCountDataThreadsafe014 *callbackData = reinterpret_cast(data); + CallbackCountData* callbackData = reinterpret_cast(data); EXPECT_EQ(callbackData->tsfn, nullptr); napi_delete_async_work(env, callbackData->work); callbackData->work = nullptr; @@ -1552,5 +1553,45 @@ HWTEST_F(NapiThreadsafeTest, ThreadsafeTest014, testing::ext::TestSize.Level1) }, callbackData, &callbackData->work); napi_queue_async_work(env, callbackData->work); - HILOG_INFO("ThreadsafeTest014 end"); -} \ No newline at end of file + runner.Run(); +} + +/** + * @tc.name: ThreadsafeTest015 + * @tc.desc: Test cal tsfn more than max queue size + * @tc.type: FUNC + */ +HWTEST_F(NapiThreadsafeTest, ThreadsafeTest015, testing::ext::TestSize.Level1) +{ + UVLoopRunner runner(engine_); + napi_env env = reinterpret_cast(engine_); + std::atomic* extraQueuedCounts = new std::atomic { 0 }; + napi_threadsafe_function tsfn = nullptr; + ASSERT_CHECK_CALL(napi_create_threadsafe_function( + env, nullptr, nullptr, GetNapiTCName(env), MAX_QUEUE_SIZE, 1, nullptr, nullptr, nullptr, + [](napi_env, napi_value, void*, void*) {}, &tsfn)); + // It fulled queue of tsfn + for (uint32_t i = 0; i < MAX_QUEUE_SIZE; i++) { + ASSERT_CHECK_CALL(napi_call_threadsafe_function(tsfn, nullptr, napi_tsfn_blocking)); + } + + constexpr uint32_t EXTRA_TASK_SIZE = MAX_QUEUE_SIZE * 2; + for (uint32_t i = 0; i < EXTRA_TASK_SIZE; i++) { + std::thread( + [](napi_threadsafe_function tsfn, std::atomic* counter) { + ASSERT_CHECK_CALL(napi_call_threadsafe_function(tsfn, nullptr, napi_tsfn_blocking)); + if (++(*counter) == EXTRA_TASK_SIZE) { + // release tsfn after all task queued + ASSERT_CHECK_CALL(napi_release_threadsafe_function(tsfn, napi_tsfn_release)); + } + }, + tsfn, extraQueuedCounts) + .detach(); + } + + // Run loop for execution and notify. + runner.Run(); + // It tests whether call threads can call finish after queue is fulled + ASSERT_EQ(extraQueuedCounts->load(), EXTRA_TASK_SIZE); + delete extraQueuedCounts; +} -- Gitee