From 3ec9fd2dc6902c8cc372142dc002b9e4f0b19b79 Mon Sep 17 00:00:00 2001 From: mayuehit Date: Tue, 2 Dec 2025 11:49:02 +0800 Subject: [PATCH 1/2] fix --- api/python/yr/cluster_mode_runtime.py | 15 +++++++++++---- src/libruntime/libruntime.cpp | 10 ---------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/api/python/yr/cluster_mode_runtime.py b/api/python/yr/cluster_mode_runtime.py index f16d791..7e2fd9c 100644 --- a/api/python/yr/cluster_mode_runtime.py +++ b/api/python/yr/cluster_mode_runtime.py @@ -338,12 +338,14 @@ class ClusterModeRuntime(Runtime): :return: producer """ if config.max_stream_size < 0: - raise RuntimeError(f"Invalid parameter, max_stream_size: {config.max_stream_size}, expect >= 0") + raise RuntimeError( + f"Invalid parameter, max_stream_size: {config.max_stream_size}, expect >= 0") if config.retain_for_num_consumers < 0: raise RuntimeError( f"Invalid parameter, retain_for_num_consumers: {config.retain_for_num_consumers}, expect >= 0") if config.reserve_size < 0: - raise RuntimeError(f"Invalid parameter, reserve_size: {config.reserve_size}, expect >= 0") + raise RuntimeError( + f"Invalid parameter, reserve_size: {config.reserve_size}, expect >= 0") return self.libruntime.create_stream_producer(stream_name, config) def create_stream_consumer(self, stream_name: str, config: SubscriptionConfig) -> Consumer: @@ -572,7 +574,8 @@ class ClusterModeRuntime(Runtime): object_id """ self._check_init() - result = self.libruntime.peek_object_ref_stream(generator_id, blocking, timeout_ms) + result = self.libruntime.peek_object_ref_stream( + generator_id, blocking, timeout_ms) if not isinstance(result, str): objects = Serialization().deserialize(result) for obj in objects: @@ -688,7 +691,8 @@ class ClusterModeRuntime(Runtime): else: serialized_arg = Serialization().serialize(arg) invoke_arg = InvokeArg(buf=None, is_ref=False, obj_id="", - nested_objects=set([ref.id for ref in serialized_arg.nested_refs]), + nested_objects=set( + [ref.id for ref in serialized_arg.nested_refs]), serialized_obj=serialized_arg) args_list_new.append(invoke_arg) return args_list_new @@ -701,6 +705,9 @@ class ClusterModeRuntime(Runtime): raise RuntimeError("runtime not enable") def create_group(self, group_name: str, group_opts: GroupOptions): + if group_opts.timeout < 0 and group_opts.timeout != -1: + raise RuntimeError( + f"Invalid parameter, timeout: {group_opts.timeout}, expect -1 or > 0") self.libruntime.create_group(group_name, group_opts) def terminate_group(self, group_name: str): diff --git a/src/libruntime/libruntime.cpp b/src/libruntime/libruntime.cpp index 227bef0..fc4d6f7 100755 --- a/src/libruntime/libruntime.cpp +++ b/src/libruntime/libruntime.cpp @@ -361,10 +361,6 @@ std::pair Libruntime::CreateInstance(const YR::Libruntim } invokeOrderMgr->CreateInstance(spec); - auto insId = spec->GetNamedInstanceId(); - if (!insId.empty()) { - spec->returnIds[0].id = insId; - } memStore->AddReturnObject(spec->returnIds); dependencyResolver->ResolveDependencies(spec, [this, spec, returnObjs](const ErrorInfo &err) { if (err.OK()) { @@ -445,12 +441,6 @@ ErrorInfo Libruntime::InvokeByInstanceId(const YR::Libruntime::FunctionMeta &fun auto spec = std::make_shared(runtimeContext->GetJobId(), funcMeta, returnObjs, std::move(invokeArgs), libruntime::InvokeType::InvokeFunction, std::move(traceId), std::move(requestId), instanceId, opts); - if (spec->opts.isGetInstance) { - YRLOG_DEBUG("this is not normal member function invoke, redefine invoke type, name is {}, ns is {}", - funcMeta.name, funcMeta.ns); - spec->invokeType = libruntime::InvokeType::GetNamedInstanceMeta; - spec->invokeInstanceId = instanceId; - } err = PreProcessArgs(spec); if (err.Code() != ErrorCode::ERR_OK) { YRLOG_ERROR("pre process failed, req id: {}, code: {}, message: {}", spec->requestId, -- Gitee From 9afd49341b2ea674bbfb635cdef87bad3f5af906 Mon Sep 17 00:00:00 2001 From: mayuehit Date: Tue, 9 Dec 2025 21:48:29 +0800 Subject: [PATCH 2/2] fix user function exception order --- src/libruntime/invoke_order_manager.cpp | 3 +++ src/libruntime/invokeadaptor/invoke_adaptor.cpp | 1 + 2 files changed, 4 insertions(+) diff --git a/src/libruntime/invoke_order_manager.cpp b/src/libruntime/invoke_order_manager.cpp index 093776e..8efb525 100644 --- a/src/libruntime/invoke_order_manager.cpp +++ b/src/libruntime/invoke_order_manager.cpp @@ -223,6 +223,9 @@ void InvokeOrderManager::UpdateFinishReqSeqNo(const std::string &instanceId, int absl::MutexLock lock(&mu); if (instances.find(instanceId) != instances.end()) { auto instOrder = instances[instanceId]; + if (invokeSeqNo < instOrder->unfinishedSeqNo) { + return; + } instOrder->finishedOrderedReqSeqNo.emplace(invokeSeqNo); auto it = instOrder->finishedOrderedReqSeqNo.begin(); while (it != instOrder->finishedOrderedReqSeqNo.end()) { diff --git a/src/libruntime/invokeadaptor/invoke_adaptor.cpp b/src/libruntime/invokeadaptor/invoke_adaptor.cpp index b7baf4e..432cd4f 100644 --- a/src/libruntime/invokeadaptor/invoke_adaptor.cpp +++ b/src/libruntime/invokeadaptor/invoke_adaptor.cpp @@ -1314,6 +1314,7 @@ void InvokeAdaptor::InvokeNotifyHandler(const NotifyRequest &req, const ErrorInf // if timeout, then send cancel req to runtime for erase pending thread (void)this->KillAsync(spec->invokeInstanceId, req.requestid(), libruntime::Signal::ErasePendingThread); } + invokeOrderMgr->NotifyInvokeSuccess(spec); } else { YRLOG_ERROR( "instance invoke failed and retry, request id: {}, instance id: {}, return id: {}, seq: {}, complete " -- Gitee