diff --git a/native_engine/native_safe_async_work.cpp b/native_engine/native_safe_async_work.cpp index dec9ac812f4e66ba900c6dfd042592feb89102a0..b88cb9fb7305952ae7ff346b3435525aca4c1be2 100644 --- a/native_engine/native_safe_async_work.cpp +++ b/native_engine/native_safe_async_work.cpp @@ -189,9 +189,11 @@ SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCall if (IsMaxQueueSize()) { HILOG_INFO("queue size bigger than max queue size"); if (mode == NATIVE_TSFUNC_BLOCKING) { + waitingCount_++; while (IsMaxQueueSize()) { condition_.wait(lock); } + waitingCount_--; } else { return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL; } @@ -341,7 +343,7 @@ void NativeSafeAsyncWork::ProcessAsyncHandle() data = queue_.front(); // when queue is full, notify send. - if (size == maxQueueSize_ && maxQueueSize_ > 0) { + if (maxQueueSize_ > 0 && waitingCount_ > 0) { condition_.notify_one(); } diff --git a/native_engine/native_safe_async_work.h b/native_engine/native_safe_async_work.h index 0879981b17c961533c88d87b688077a08e2f2dd9..fc9ddbee03f9f4f21db9cef7e30ae213b1ca5e2c 100644 --- a/native_engine/native_safe_async_work.h +++ b/native_engine/native_safe_async_work.h @@ -100,6 +100,7 @@ protected: NativeReference* ref_ = nullptr; size_t maxQueueSize_ = 0; size_t threadCount_ = 0; + size_t waitingCount_ { 0 }; void* finalizeData_ = nullptr; NativeFinalize finalizeCallback_ = nullptr; void* context_ = nullptr; diff --git a/test/unittest/common/test_common.h b/test/unittest/common/test_common.h index a6ee8d486d7022117233cf05d34bb5e15e2f57f4..bf5d03138e3cd6b7ae587c7190cfb8572234bbd6 100644 --- a/test/unittest/common/test_common.h +++ b/test/unittest/common/test_common.h @@ -15,8 +15,11 @@ #ifndef FOUNDATION_ACE_NAPI_TEST_UNITTEST_TEST_COMMON_H #define FOUNDATION_ACE_NAPI_TEST_UNITTEST_TEST_COMMON_H + #include "gtest/gtest.h" #include "utils/log.h" +#include "napi/native_api.h" + #define ASSERT_CHECK_CALL(call) \ { \ ASSERT_EQ(call, napi_ok); \ diff --git a/test/unittest/test_napi_threadsafe.cpp b/test/unittest/test_napi_threadsafe.cpp index 882b9fbfc4322c93191e78fdebde6456838ac0f5..5e287acbf94f4fa619269e11b3cd2c2d2f86a6fa 100644 --- a/test/unittest/test_napi_threadsafe.cpp +++ b/test/unittest/test_napi_threadsafe.cpp @@ -13,9 +13,12 @@ * limitations under the License. */ +#include "engine/test.h" #include "test.h" #include +#include +#include #include #include @@ -76,10 +79,10 @@ static uv_thread_t g_uvThreadSecondary; static uv_thread_t g_uvTheads2; static uv_thread_t g_uvTheads3; static int32_t g_sendDatas[SEND_DATAS_LENGTH]; -static int32_t g_callSuccessCount = 0; -static int32_t g_callSuccessCountJS = 0; -static int32_t g_callSuccessCountJSFour = 0; -static int32_t g_callDepth = 4; +static int32_t g_callSuccessCount = 0; +static int32_t g_callSuccessCountJS = 0; +static int32_t g_callSuccessCountJSFour = 0; +static int32_t g_callDepth = 4; bool acquireFlag = false; static int32_t g_receiveCnt = 0; static bool g_isTailA = false; @@ -306,7 +309,6 @@ static void TsFuncDataSourceThreadCountTotal(void* data) // set send data g_sendData = SEND_DATA_TEST; if (acquireFlag) { - std::cout<<"acquireFlag is true"<(data); - napi_release_threadsafe_function(callbackData->tsfn, napi_tsfn_release); - return; -} - -static void finalizeCallBackThreadsafe014(napi_env env, void* finalizeData, void* hint) -{ - CallbackCountDataThreadsafe014 *callbackData = reinterpret_cast(finalizeData); - callbackData->tsfn = nullptr; -} - -static void executeWorkThreadsafe014(napi_env env, void *data) -{ - CallbackCountDataThreadsafe014 *callbackData = reinterpret_cast(data); - napi_call_threadsafe_function(callbackData->tsfn, (void *)callbackData, napi_tsfn_nonblocking); - std::this_thread::sleep_for(std::chrono::seconds(INT_TWO)); -} - HWTEST_F(NapiThreadsafeTest, ThreadsafeTest014, testing::ext::TestSize.Level1) { - HILOG_INFO("ThreadsafeTest014 start"); + struct CallbackCountData { + napi_threadsafe_function tsfn; + napi_async_work work; + std::mutex mutex; + std::condition_variable cond; + }; + + UVLoopRunner runner(engine_); napi_env env = (napi_env)engine_; - CallbackCountDataThreadsafe014 *callbackData = new CallbackCountDataThreadsafe014(); + CallbackCountData* callbackData = new CallbackCountData(); napi_value resourceName = 0; napi_create_string_latin1(env, __func__, NAPI_AUTO_LENGTH, &resourceName); - auto status = napi_create_threadsafe_function(env, nullptr, nullptr, resourceName, 0, 1, - callbackData, finalizeCallBackThreadsafe014, callbackData, callJSCallBackThreadsafe014, &callbackData->tsfn); + auto status = napi_create_threadsafe_function( + env, nullptr, nullptr, resourceName, 0, 1, callbackData, + [](napi_env env, void* finalizeData, void* hint) { + CallbackCountData* callbackData = + reinterpret_cast(finalizeData); + callbackData->tsfn = nullptr; + callbackData->cond.notify_one(); + }, + callbackData, + [](napi_env env, napi_value tsfn_cb, void* context, void* data) { + CallbackCountData* callbackData = reinterpret_cast(data); + napi_release_threadsafe_function(callbackData->tsfn, napi_tsfn_release); + }, + &callbackData->tsfn); EXPECT_EQ(status, napi_ok); - napi_create_async_work(env, nullptr, resourceName, executeWorkThreadsafe014, + napi_create_async_work( + env, nullptr, resourceName, + [](napi_env env, void* data) { + CallbackCountData* callbackData = reinterpret_cast(data); + std::unique_lock lock(callbackData->mutex); + napi_call_threadsafe_function(callbackData->tsfn, (void*)callbackData, napi_tsfn_nonblocking); + callbackData->cond.wait(lock); + }, [](napi_env env, napi_status status, void* data) { - CallbackCountDataThreadsafe014 *callbackData = reinterpret_cast(data); + CallbackCountData* callbackData = reinterpret_cast(data); EXPECT_EQ(callbackData->tsfn, nullptr); napi_delete_async_work(env, callbackData->work); callbackData->work = nullptr; @@ -1552,5 +1553,45 @@ HWTEST_F(NapiThreadsafeTest, ThreadsafeTest014, testing::ext::TestSize.Level1) }, callbackData, &callbackData->work); napi_queue_async_work(env, callbackData->work); - HILOG_INFO("ThreadsafeTest014 end"); -} \ No newline at end of file + runner.Run(); +} + +/** + * @tc.name: ThreadsafeTest015 + * @tc.desc: Test cal tsfn more than max queue size + * @tc.type: FUNC + */ +HWTEST_F(NapiThreadsafeTest, ThreadsafeTest015, testing::ext::TestSize.Level1) +{ + UVLoopRunner runner(engine_); + napi_env env = reinterpret_cast(engine_); + std::atomic* extraQueuedCounts = new std::atomic { 0 }; + napi_threadsafe_function tsfn = nullptr; + ASSERT_CHECK_CALL(napi_create_threadsafe_function( + env, nullptr, nullptr, GetNapiTCName(env), MAX_QUEUE_SIZE, 1, nullptr, nullptr, nullptr, + [](napi_env, napi_value, void*, void*) {}, &tsfn)); + // It fulled queue of tsfn + for (uint32_t i = 0; i < MAX_QUEUE_SIZE; i++) { + ASSERT_CHECK_CALL(napi_call_threadsafe_function(tsfn, nullptr, napi_tsfn_blocking)); + } + + constexpr uint32_t EXTRA_TASK_SIZE = MAX_QUEUE_SIZE * 2; + for (uint32_t i = 0; i < EXTRA_TASK_SIZE; i++) { + std::thread( + [](napi_threadsafe_function tsfn, std::atomic* counter) { + ASSERT_CHECK_CALL(napi_call_threadsafe_function(tsfn, nullptr, napi_tsfn_blocking)); + if (++(*counter) == EXTRA_TASK_SIZE) { + // release tsfn after all task queued + ASSERT_CHECK_CALL(napi_release_threadsafe_function(tsfn, napi_tsfn_release)); + } + }, + tsfn, extraQueuedCounts) + .detach(); + } + + // Run loop for execution and notify. + runner.Run(); + // It tests whether call threads can call finish after queue is fulled + ASSERT_EQ(extraQueuedCounts->load(), EXTRA_TASK_SIZE); + delete extraQueuedCounts; +}