From 3457c869553810db56c1cf46658611e4fbe2dcb4 Mon Sep 17 00:00:00 2001 From: Cillinn <1364800211@qq.com> Date: Sat, 6 Sep 2025 09:30:04 +0800 Subject: [PATCH] fix EAWorker timeout and quitSafely bug Issue: https://gitee.com/openharmony/arkcompiler_runtime_core/issues/ICWO7W?from=project-issue Signed-off-by: Cillinn --- .../ets/stdlib/std/concurrency/Message.ets | 2 +- .../stdlib/std/concurrency/MessageHandler.ets | 15 +-- .../plugins/ets/stdlib/std/core/EAWorker.ets | 112 +++++++++++++----- .../std/concurrency/eaworker_message.ets | 40 ++++++- .../messageHandler_basic_interface.ets | 38 +++++- .../ets-func-tests/ets-func-tests-ignored.txt | 1 - 6 files changed, 163 insertions(+), 45 deletions(-) diff --git a/static_core/plugins/ets/stdlib/std/concurrency/Message.ets b/static_core/plugins/ets/stdlib/std/concurrency/Message.ets index 02c2a38d72..ca56c6bb6c 100644 --- a/static_core/plugins/ets/stdlib/std/concurrency/Message.ets +++ b/static_core/plugins/ets/stdlib/std/concurrency/Message.ets @@ -57,7 +57,7 @@ export namespace concurrency { * @param { MessageHandler } handler The message handler * @param { Object } obj Additional object data */ - constructor(what: int,obj: Object, handler: concurrency.MessageHandler) { + constructor(what: int, obj: Object, handler: concurrency.MessageHandler) { this.handler = handler; this.what = what; this.obj = obj; diff --git a/static_core/plugins/ets/stdlib/std/concurrency/MessageHandler.ets b/static_core/plugins/ets/stdlib/std/concurrency/MessageHandler.ets index 25e5c45788..067d775a09 100644 --- a/static_core/plugins/ets/stdlib/std/concurrency/MessageHandler.ets +++ b/static_core/plugins/ets/stdlib/std/concurrency/MessageHandler.ets @@ -52,7 +52,7 @@ export namespace concurrency { * @returns { boolean } True if the callback exists, false otherwise */ public hasCallbacks(callback: Task): boolean { - return this.worker.contains(new concurrency.Message(callback, this)); + return this.worker.hasCallbacks(this, callback); } /** @@ -61,7 +61,7 @@ export namespace concurrency { * @returns { boolean } True if messages with the identifier exist, false otherwise */ public hasMessages(what: int): boolean { - return this.worker.contains(new concurrency.Message(what, this)); + return this.worker.hasMessages(this, what, undefined); } /** @@ -71,7 +71,7 @@ export namespace concurrency { * @returns { boolean } True if messages with the identifier and object exist, false otherwise */ public hasMessages(what: int, obj: Object): boolean { - return this.worker.contains(new concurrency.Message(what, obj, this)); + return this.worker.hasMessages(this, what, obj); } /** @@ -95,8 +95,7 @@ export namespace concurrency { * @returns { boolean } True if the callback was successfully removed, false otherwise */ public removeCallbacks(callback: Task): boolean { - let msg = new concurrency.Message(callback, this); - return this.worker.cancelMessage(msg); + return this.worker.removeCallbacks(this, callback); } /** @@ -105,8 +104,7 @@ export namespace concurrency { * @returns { boolean } True if messages were successfully removed, false otherwise */ public removeMessages(what: int): boolean { - let msg = new concurrency.Message(what, this); - return this.worker.cancelMessage(msg); + return this.worker.removeMessages(this, what, undefined); } /** @@ -116,8 +114,7 @@ export namespace concurrency { * @returns { boolean } True if messages were successfully removed, false otherwise */ public removeMessages(what: int, obj: Object): boolean { - let msg = new concurrency.Message(what, obj, this); - return this.worker.cancelMessage(msg); + return this.worker.removeMessages(this, what, obj); } /** diff --git a/static_core/plugins/ets/stdlib/std/core/EAWorker.ets b/static_core/plugins/ets/stdlib/std/core/EAWorker.ets index 6a0a1fbda4..51f546c52f 100644 --- a/static_core/plugins/ets/stdlib/std/core/EAWorker.ets +++ b/static_core/plugins/ets/stdlib/std/core/EAWorker.ets @@ -83,6 +83,8 @@ export class EAWorker { let mainWorker = mainWorkerObj as EAWorker; mainWorker.isMain = true; EAWorker.mainWorker = mainWorker; + mainWorker.name = "MAIN"; + EAWorker.workers.set(MAIN_WORKER_ID, mainWorker); } private worker: InternalWorker; @@ -144,6 +146,27 @@ export class EAWorker { overload constructor { supportInterop, withName, withTask, withNameAndTask } + + /** + * Get all registered EAWorker instances. + * + * @returns { containers.ConcurrentHashMap } + * A concurrent hash map containing all EAWorker instances, + * keyed by their integer identifiers. + */ + internal static getWorkers(): containers.ConcurrentHashMap { + return EAWorker.workers; + } + + /** + * Set the state of the loopStarted flag. + * + * @param flag - A boolean value indicating whether the loop has started. + */ + internal setLoopStarted(flag: boolean) { + this.loopStarted.set(flag); + } + /** * Get the worker identifier * @returns { int } The unique identifier of the worker @@ -236,14 +259,7 @@ export class EAWorker { if (!this.isStarted.get()) { throw new Error("Cannot stop worker before start"); } - if (!this.loopStarted.get()) { - return; - } this.worker.quit(); - this.loopStarted.set(false); - if (EAWorker.workers.get(this.eaworkerNum) != undefined) { - EAWorker.workers.delete(this.eaworkerNum); - } } /** @@ -257,12 +273,7 @@ export class EAWorker { if (!this.isStarted.get()) { throw new Error("Cannot stop worker before start"); } - if (!this.loopStarted.get()) { - return; - } this.worker.join(); - EAWorker.workers.delete(this.eaworkerNum); - this.loopStarted.set(false); } /** @@ -333,34 +344,67 @@ export class EAWorker { return res; } - internal getMessage(message: concurrency.Message): concurrency.Message | undefined { + internal hasMessages(handler: concurrency.MessageHandler, what: int, object: Object | undefined): boolean { if (this.isMain) { - return undefined; + return false; } - let res: concurrency.Message | undefined = undefined; + let res = false; this.messages.forEach((value: MessageStatus, key: concurrency.Message) => { - if (key.equals(message) && value != MessageStatus.CANCELLED) { - res = key; + if (key.getTarget() == handler + && key.getWhat() == what + && (object == undefined || key.getObject() == object) + && value != MessageStatus.CANCELLED) { + res = true; } }); return res; } - /** - * Cancel a message by setting its status to CANCELLED - * @param { Message } message The message to cancel - * @returns { boolean } True if the message was cancelled, false otherwise - */ - internal cancelMessage(message: concurrency.Message): boolean { + internal hasCallbacks(handler: concurrency.MessageHandler, callback: Task): boolean { if (this.isMain) { return false; } - let msg = this.getMessage(message); - if (msg != undefined) { - this.messages.set(msg, MessageStatus.CANCELLED); - return true; + let res = false; + this.messages.forEach((value: MessageStatus, key: concurrency.Message) => { + if (key.getTarget() == handler && key.getCallback() == callback) { + res = true; + } + }); + return res; + } + + internal removeCallbacks(handler: concurrency.MessageHandler, callback: Task): boolean { + if (this.isMain) { + return false; + } + let res = false; + this.messages.forEach((value: MessageStatus, key: concurrency.Message) => { + if (key.getTarget() == handler + && key.getCallback() == callback + && value != MessageStatus.CANCELLED) { + this.messages.set(key, MessageStatus.CANCELLED); + res = true; + } + }); + return res; + } + + internal removeMessages(handler: concurrency.MessageHandler, what: int, object: Object | undefined): boolean { + if (this.isMain) { + return false; } - return false; + let res = false; + this.messages.forEach((value: MessageStatus, key: concurrency.Message) => { + if (key.getTarget() == handler + && key.getWhat() == what + && (object == undefined || key.getObject() == object) + && value != MessageStatus.CANCELLED + && key.getCallback() == undefined) { + this.messages.set(key, MessageStatus.CANCELLED); + res = true; + } + }); + return res; } /** @@ -439,8 +483,6 @@ export class EAWorker { } this.worker.join(); - EAWorker.workers.delete(this.eaworkerNum); - this.loopStarted.set(false); } /** @@ -629,6 +671,12 @@ class InteropWorker implements InternalWorker { public join() { this.initEvent.wait(); + let finalTask = () => { + let current = EAWorker.current() as EAWorker; + EAWorker.getWorkers().delete(this.eaworkerNum); + current.setLoopStarted(false); + } + this.poster?.post(finalTask); this.poster!.destroy(); } @@ -759,6 +807,7 @@ class StaticWorker implements InternalWorker { while (this.tasks.size > 0) { this.tasks.poll(); } + this.messages.clear(); this.join(); } @@ -780,6 +829,9 @@ class StaticWorker implements InternalWorker { while (true) { let task = this.tasks.pop(); if (task === StaticWorker.closingTask) { + let current = EAWorker.current() as EAWorker; + EAWorker.getWorkers().delete(this.eaworkerNum); + current.setLoopStarted(false); break; } let shouldExecute = true; diff --git a/static_core/plugins/ets/tests/ets_func_tests/std/concurrency/eaworker_message.ets b/static_core/plugins/ets/tests/ets_func_tests/std/concurrency/eaworker_message.ets index dfdd51fe8c..f01db3e8a8 100644 --- a/static_core/plugins/ets/tests/ets_func_tests/std/concurrency/eaworker_message.ets +++ b/static_core/plugins/ets/tests/ets_func_tests/std/concurrency/eaworker_message.ets @@ -59,6 +59,14 @@ function testTaskEAWorker() { eaw.join(); } +const sleep = (ms: int): Promise => { + return new Promise(resolve => { + setTimeout(() => { + resolve(1) + }, ms) + }); +} + function testIsAliveAndQuit() { let eaw1 = new EAWorker(); let eaw2 = new EAWorker('custom name'); @@ -75,6 +83,14 @@ function testIsAliveAndQuit() { eaw1.quit(); eaw2.quitSafely(); eaw3.join(); + let maxChecks = 10; + let sleepTime = 10; + for (let i = 0; i < maxChecks; i++) { + if (!eaw1.isAlive() && !eaw2.isAlive() && !eaw3.isAlive()) { + break; + } + waitForCompletion(() => sleep(sleepTime)); + } arktest.assertFalse(eaw1.isAlive()); arktest.assertFalse(eaw2.isAlive()); arktest.assertFalse(eaw3.isAlive()); @@ -91,7 +107,6 @@ function testMainQuit() { arktest.expectError(() => { eawMain.join(); }, new Error('Main worker cannot be stopped')); - } function testQuit() { @@ -137,6 +152,8 @@ function testQuitSafely() { } function testCurrent() { + let main = EAWorker.current(); + arktest.assertEQ(main, EAWorker.main()); let eaw = new EAWorker(); eaw.start(); let waiter = new Event(); @@ -278,13 +295,16 @@ function testMain() { arktest.assertEQ(mainWorker.getUncaughtExceptionHandler(), undefined); } + function testTaskPool() { + let event = new Event(); taskpool.execute(() => { arktest.expectError(() => { EAWorker.current(); }, new Error('Can not get current worker in taskpool')); + event.fire(); }); - + event.wait(); CoroutineExtras.stopTaskpool(); } @@ -323,6 +343,21 @@ function testStartafterJoin() { eaw.quit(); } +function testPostTaskWhenNotStarted() { + let eaw = new EAWorker(); + arktest.expectError(() => { + eaw.postTask(() => {}); + }, new Error('Can not post task when worker is not started')); +} + +function testStartAfterQuit() { + let eaw = new EAWorker(); + eaw.quit(); + arktest.expectError(() => { + eaw.start(); + }, new Error('Can not start worker after quit')); +} + function main() { const suite = new arktest.ArkTestsuite('EAWorker Message Test Suite'); suite.addTest('testNamedEAWorker', testNamedEAWorker); @@ -337,7 +372,6 @@ function main() { suite.addTest('testPriority', testPriority); suite.addTest('testCommunication', testCommunication); suite.addTest('testMainWorker', testMain); - suite.addTest('testTaskPool', testTaskPool); suite.addTest('testQuitBeforeStart', testQuitBeforeStart); suite.addTest('testQuitSafelyBeforeStart', testQuitSafelyBeforeStart); suite.addTest('testStartafterJoin', testStartafterJoin); diff --git a/static_core/plugins/ets/tests/ets_func_tests/std/concurrency/messageHandler_basic_interface.ets b/static_core/plugins/ets/tests/ets_func_tests/std/concurrency/messageHandler_basic_interface.ets index 4ec358b335..8428a616e7 100644 --- a/static_core/plugins/ets/tests/ets_func_tests/std/concurrency/messageHandler_basic_interface.ets +++ b/static_core/plugins/ets/tests/ets_func_tests/std/concurrency/messageHandler_basic_interface.ets @@ -300,6 +300,42 @@ function testRemoveMultipleMessages() { eaw.join(); } +function testHasMessage2() { + let eaw = new EAWorker(); + eaw.start(); + let handler: concurrency.MessageHandler; + let waiter = new Event(); + + handler = new concurrency.MessageHandler((msg: concurrency.Message) => { + if (msg.getCallback() != undefined) { + let cb = msg.getCallback() as Callback; + cb(); + } + }, eaw); + let message = new concurrency.Message(MessageType.STRING, "test", handler); + let message2 = new concurrency.Message(MessageType.STRING, "test2", handler); + + let callback = () => { + handler.sendMessage(message); + handler.sendMessage(message2); + arktest.assertEQ(handler.hasMessages(MessageType.STRING), true); + handler.removeMessages(MessageType.STRING, "test"); + arktest.assertEQ(handler.hasMessages(MessageType.STRING), true); + handler.sendMessage(new concurrency.Message(MessageType.STRING, "test3", handler)); + arktest.assertEQ(handler.hasMessages(MessageType.STRING, "test3"), true); + handler.removeMessages(MessageType.STRING); + arktest.assertEQ(handler.hasMessages(MessageType.STRING, "test3"), false); + arktest.assertEQ(handler.hasMessages(MessageType.STRING, "test2"), false); + arktest.assertEQ(handler.hasMessages(MessageType.STRING, "test"), false); + arktest.assertEQ(handler.hasMessages(MessageType.STRING), false); + waiter.fire(); + } + + handler.sendMessage(new concurrency.Message(callback, handler)); + waiter.wait(); + eaw.join(); +} + function testMessageHandlerMismatch() { let eaw = new EAWorker(); eaw.start(); @@ -335,7 +371,6 @@ function main() { let testSuite = new arktest.ArkTestsuite('messageHandlerBasicInterface'); testSuite.addTest("testMessageHandlerCreation", testMessageHandlerCreation); - testSuite.addTest("testMessageHandlerWithoutWorker", testMessageHandlerWithoutWorker); testSuite.addTest("testSendEmptyMessage", testSendEmptyMessage); testSuite.addTest("testSendStringMessage", testSendStringMessage); testSuite.addTest("testSendNumberMessage", testSendNumberMessage); @@ -346,6 +381,7 @@ function main() { testSuite.addTest("testRemoveMultipleMessage", testRemoveMultipleMessages); testSuite.addTest("testMessageHandlerMismatch", testMessageHandlerMismatch); testSuite.addTest("testDoubleRemoveMessage", testDoubleRemoveMessage); + testSuite.addTest("testHasMessage2", testHasMessage2); testSuite.run(); } diff --git a/static_core/plugins/ets/tests/test-lists/ets-func-tests/ets-func-tests-ignored.txt b/static_core/plugins/ets/tests/test-lists/ets-func-tests/ets-func-tests-ignored.txt index e167a18a18..74b9d58f15 100644 --- a/static_core/plugins/ets/tests/test-lists/ets-func-tests/ets-func-tests-ignored.txt +++ b/static_core/plugins/ets/tests/test-lists/ets-func-tests/ets-func-tests-ignored.txt @@ -506,4 +506,3 @@ escompat/JsonStringifyRegExpExecArrayTest.ets std/core/std_core_typeduarrays_methods__BigUint64Array.ets regression/15278_0.ets spec/07.expressions/7.32.Lambda_Expressions/callMethodFromAsyncLambda2.ets -std/concurrency/eaworker_message.ets \ No newline at end of file -- Gitee