diff --git a/samples/contribute/ACT/SVP_NNN/README.md b/samples/contribute/ACT/SVP_NNN/README.md index b8692abf49804fc18986058bf22d325f002474a7..e4c93a8988a624a48ee84434508be1a75f66cfdc 100755 --- a/samples/contribute/ACT/SVP_NNN/README.md +++ b/samples/contribute/ACT/SVP_NNN/README.md @@ -141,7 +141,7 @@ ACT(Action Chunking with Transformers)是面向机器人学习场景的高 ``` 成功后生成`act_ros2_simplified.om`文件,通过以下命令,将模型进行重命名,供main文件加载。 ``` - mv ./model/act_ros2_simplified.onnx ./model/act_distill_fp32_for_mindcmd_simp_release.om + mv ./model/act_ros2_simplified.om ./model/act_distill_fp32_for_mindcmd_simp_release.om ``` **ATC命令核心参数说明**: | 参数 | 说明 | @@ -297,6 +297,8 @@ ACT(Action Chunking with Transformers)是面向机器人学习场景的高 - 输入0:形状`(1, 6)`,float32类型,元素数6,字节数24; - 输入1:形状`(1, 3, 240, 320)`(3通道240×320图像),float32类型,元素数230400,字节数921600; - 输入2:形状`(1, 3, 240, 320)`(3通道240×320图像),float32类型,元素数230400,字节数921600; - 1. **推理耗时指标**: - - 模型核心推理时间:单次推理约**37毫秒**(仅模型前向计算耗时,不含数据传输/解析); - - 端到端推理时间:单次推理约**1.2秒**(含数据打包、C++进程通信、输出解析、张量转换全流程耗时),可通过流水线设计增加推理吞吐; \ No newline at end of file + 2. **推理耗时指标**: + - 模型核心推理时间:单次推理约**39.5毫秒**; + - 端到端推理时间:单次推理约**63.0毫秒**; + - 非模型开销:约**23.5毫秒**(含数据准备、IPC通信、后处理); + - 首次请求响应时间:首次推理请求至返回结果的耗时,约**70秒**(含模型加载和端到端推理时间); diff --git a/samples/contribute/ACT/SVP_NNN/inc/model_process.h b/samples/contribute/ACT/SVP_NNN/inc/model_process.h index 8b06394a248cb977c2bc1d6baa58e10418fb4fd1..14e7865332102f8c061c4ba8fbbfa561aab0d989 100755 --- a/samples/contribute/ACT/SVP_NNN/inc/model_process.h +++ b/samples/contribute/ACT/SVP_NNN/inc/model_process.h @@ -17,6 +17,7 @@ #ifndef MODEL_PROCESS_H #define MODEL_PROCESS_H +#include #include #include #include "utils.h" @@ -165,6 +166,12 @@ public: */ size_t GetInputNum() const; + /** + * Function: Get the total number of model output nodes + * Return: Count of output nodes + */ + size_t GetOutputNum() const; + /** * Function: Get stride parameters of specified input index * Input: index - target index of the input buffer (starting from 0) @@ -191,6 +198,20 @@ public: */ Result CreateInputFromData(const std::vector& input_datas, const std::vector& input_sizes); + + /** + * Function: Extract a compact output tensor without stride padding + * Input: index - output index + * Output: packed - compact output bytes + * Output: dims - logical dimensions reported by the model + * Output: elem_type - worker element type used by the binary protocol + * Return: Result status code (success/failure of extraction) + */ + Result GetPackedOutputData( + size_t index, + std::vector& packed, + std::vector& dims, + uint32_t& elem_type) const; /** * Function: Release all model-related resources @@ -215,4 +236,4 @@ private: svp_acl_mdl_dataset *output_ { nullptr }; }; -#endif // MODEL_PROCESS_H \ No newline at end of file +#endif // MODEL_PROCESS_H diff --git a/samples/contribute/ACT/SVP_NNN/inc/protocol.h b/samples/contribute/ACT/SVP_NNN/inc/protocol.h new file mode 100644 index 0000000000000000000000000000000000000000..bbd23a20eabebc1c42d006ebaa374209fad44049 --- /dev/null +++ b/samples/contribute/ACT/SVP_NNN/inc/protocol.h @@ -0,0 +1,36 @@ +#ifndef PROTOCOL_H +#define PROTOCOL_H + +#include +#include +#include +#include + +#include "worker_types.h" + +struct RequestHeader { + uint32_t magic = 0; + uint16_t version = 0; + uint16_t input_count = 0; + uint32_t request_id = 0; + uint32_t flags = 0; +}; + +struct InputEntryHeader { + uint32_t input_index = 0; + uint32_t byte_size = 0; + uint32_t reserved = 0; +}; + +enum ReadHeaderStatus { + READ_HEADER_OK = 0, + READ_HEADER_EOF = 1, + READ_HEADER_ERROR = 2, +}; + +ReadHeaderStatus ReadRequestHeader(std::istream& in, RequestHeader& header, std::string& error_msg); +bool ReadInputEntryHeader(std::istream& in, InputEntryHeader& header, std::string& error_msg); +bool ReadExact(std::istream& in, char* data, size_t size, std::string& error_msg); +bool WriteResponseFrame(std::ostream& out, const InferenceResponse& response, std::string& error_msg); + +#endif diff --git a/samples/contribute/ACT/SVP_NNN/inc/sample_process.h b/samples/contribute/ACT/SVP_NNN/inc/sample_process.h index 1310a3f67408b97ff282c8e0894bffa1bedade01..cf4831ba9a648e060fb25a8da35a159a369255c5 100755 --- a/samples/contribute/ACT/SVP_NNN/inc/sample_process.h +++ b/samples/contribute/ACT/SVP_NNN/inc/sample_process.h @@ -18,10 +18,12 @@ #ifndef SAMPLE_PROCESS_H #define SAMPLE_PROCESS_H +#include #include #include "utils.h" #include "acl/svp_acl.h" #include "model_process.h" +#include "worker_types.h" /** * Class: SampleProcess @@ -54,6 +56,10 @@ public: * Return: Result status code (success/failure of the entire inference process) */ Result Process(); + InferenceResponse ProcessOnce( + const std::vector& input_datas, + const std::vector& input_sizes, + uint32_t request_id); // void DestroyResource(); /** @@ -109,4 +115,4 @@ inline void set_input_path(SampleProcess& sample, const std::string& path) { sample.input_path_ = path; // Assign input file path to the member variable of the SampleProcess instance } -#endif // SAMPLE_PROCESS_H \ No newline at end of file +#endif // SAMPLE_PROCESS_H diff --git a/samples/contribute/ACT/SVP_NNN/inc/utils.h b/samples/contribute/ACT/SVP_NNN/inc/utils.h index 195800f61877fb14772af798e8fdea75e42f4d6f..d7bb1d238f8830df655f004911dcdfa98ae30f6b 100755 --- a/samples/contribute/ACT/SVP_NNN/inc/utils.h +++ b/samples/contribute/ACT/SVP_NNN/inc/utils.h @@ -17,14 +17,15 @@ #ifndef UTILS_H #define UTILS_H +#include #include #include #include "acl/svp_acl.h" #include "acl/svp_acl_mdl.h" -#define INFO_LOG(fmt, ...) fprintf(stdout, "[INFO] " fmt "\n", ##__VA_ARGS__) -#define WARN_LOG(fmt, ...) fprintf(stdout, "[WARN] " fmt "\n", ##__VA_ARGS__) -#define ERROR_LOG(fmt, ...) fprintf(stdout, "[ERROR] " fmt "\n", ##__VA_ARGS__) +#define INFO_LOG(fmt, ...) fprintf(stderr, "[INFO] " fmt "\n", ##__VA_ARGS__) +#define WARN_LOG(fmt, ...) fprintf(stderr, "[WARN] " fmt "\n", ##__VA_ARGS__) +#define ERROR_LOG(fmt, ...) fprintf(stderr, "[ERROR] " fmt "\n", ##__VA_ARGS__) #ifdef _WIN32 #define S_ISREG(m) (((m) & 0170000) == (0100000)) diff --git a/samples/contribute/ACT/SVP_NNN/inc/worker_types.h b/samples/contribute/ACT/SVP_NNN/inc/worker_types.h new file mode 100644 index 0000000000000000000000000000000000000000..7a931291d47eb164b8b5e1791e419e5d8777ee12 --- /dev/null +++ b/samples/contribute/ACT/SVP_NNN/inc/worker_types.h @@ -0,0 +1,68 @@ +#ifndef WORKER_TYPES_H +#define WORKER_TYPES_H + +#include +#include +#include + +static const uint32_t WORKER_PROTOCOL_MAGIC = 0x53565031U; // "SVP1" +static const uint16_t WORKER_PROTOCOL_VERSION = 1U; + +enum WorkerStatusCode { + WORKER_STATUS_OK = 0, + WORKER_STATUS_ERROR = 1, +}; + +enum WorkerErrorCode { + WORKER_ERROR_NONE = 0, + WORKER_ERROR_BAD_REQUEST = 1, + WORKER_ERROR_INPUT_COUNT = 2, + WORKER_ERROR_INPUT_SIZE = 3, + WORKER_ERROR_ACL_NOT_READY = 4, + WORKER_ERROR_MODEL_NOT_READY = 5, + WORKER_ERROR_INFERENCE_FAILED = 6, + WORKER_ERROR_OUTPUT_PARSE_FAILED = 7, + WORKER_ERROR_ALLOC_FAILED = 8, + WORKER_ERROR_PROTOCOL_IO = 9, +}; + +enum WorkerElementType { + WORKER_ELEM_UNKNOWN = 0, + WORKER_ELEM_FLOAT32 = 1, + WORKER_ELEM_FLOAT16 = 2, + WORKER_ELEM_INT8 = 3, + WORKER_ELEM_UINT8 = 4, + WORKER_ELEM_INT32 = 5, + WORKER_ELEM_INT64 = 6, +}; + +struct WorkerTensor { + uint32_t output_index = 0; + uint32_t elem_type = WORKER_ELEM_UNKNOWN; + std::vector dims; + std::vector data; +}; + +struct InferenceResponse { + uint32_t request_id = 0; + bool success = false; + uint32_t latency_us = 0; + int32_t error_code = WORKER_ERROR_NONE; + std::string error_msg; + std::vector outputs; +}; + +inline InferenceResponse MakeErrorResponse( + uint32_t request_id, + int32_t error_code, + const std::string& error_msg) +{ + InferenceResponse response; + response.request_id = request_id; + response.success = false; + response.error_code = error_code; + response.error_msg = error_msg; + return response; +} + +#endif diff --git a/samples/contribute/ACT/SVP_NNN/script/model_test.py b/samples/contribute/ACT/SVP_NNN/script/model_test.py old mode 100755 new mode 100644 index 53c7db4f529df11d158818ac0295912dfa572cea..c2c415e65b37bf82e9a39cfb292c7b114c43a4fe --- a/samples/contribute/ACT/SVP_NNN/script/model_test.py +++ b/samples/contribute/ACT/SVP_NNN/script/model_test.py @@ -1,114 +1,258 @@ """ -ACTWrapper.py - -加载 act om 模型并推理 +ACT worker client for the SVP_NNN binary protocol. """ -import numpy as np -import subprocess +import argparse import os -import re import struct -import json +import subprocess import sys -import time -import argparse +import threading +from typing import Iterable, List, Sequence, Tuple + +import numpy as np + + +PROTOCOL_MAGIC = 0x53565031 +PROTOCOL_VERSION = 1 + +REQUEST_HEADER_STRUCT = struct.Struct(" bytes: + chunks: List[bytes] = [] + remaining = size + while remaining > 0: + chunk = stream.read(remaining) + if not chunk: + raise RuntimeError("worker stream closed unexpectedly") + chunks.append(chunk) + remaining -= len(chunk) + return b"".join(chunks) + + +def _dtype_from_elem_type(elem_type: int): + if elem_type == WORKER_ELEM_FLOAT32: + return np.float32 + if elem_type == WORKER_ELEM_FLOAT16: + return np.float16 + if elem_type == WORKER_ELEM_INT8: + return np.int8 + if elem_type == WORKER_ELEM_UINT8: + return np.uint8 + if elem_type == WORKER_ELEM_INT32: + return np.int32 + if elem_type == WORKER_ELEM_INT64: + return np.int64 + raise RuntimeError(f"unsupported worker element type: {elem_type}") class ACT3403Policy: - def __init__(self, cpp_executable): + def __init__(self, cpp_executable: str, model_path: str): super().__init__() self.cpp_executable = cpp_executable - self.cpp_dir = os.path.dirname(cpp_executable) # 获取二进制所在目录 - # self.cpp_process = self.model_init() - - def predict(self, batch) -> tuple: - input_arr = batch - start_time = time.perf_counter() - cpp_outputs = self.run_cpp_and_get_float_output(input_arr) - - if cpp_outputs is not None: - data = cpp_outputs.get(2) - if data is not None: - action = [] - for i in range(0, len(data), 8): - action.append(data[i:i+6]) - action = np.array(action, dtype=np.float32) - action = action.reshape([1, 100, 6]) - return action - return None + self.cpp_dir = os.path.dirname(cpp_executable) + self._worker_env = os.environ.copy() + self._worker_env["SVP_MODEL_PATH"] = model_path + self._request_id = 0 + self._process = None + self._stderr_thread = None + self._start_process() - def run_cpp_and_get_float_output(self, input_arrays): - # 启动C++进程,通过管道通信 - process = subprocess.Popen( - self.cpp_executable, + def _start_process(self): + self._process = subprocess.Popen( + [self.cpp_executable], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - text=False, # 二进制模式通信 - cwd=self.cpp_dir + cwd=self.cpp_dir, + env=self._worker_env, + text=False, + bufsize=0, ) + self._stderr_thread = threading.Thread( + target=self._drain_stderr, + args=(self._process.stderr,), + daemon=True, + ) + self._stderr_thread.start() + def _drain_stderr(self, pipe): try: - # 发送输入数据 - for i, arr in enumerate(input_arrays): - data_bytes = arr.tobytes() # 获取原始二进制数据 - data_size = struct.pack(' str: + if self._process is None: + return "worker process is not running" + + return_code = self._process.poll() + stderr_text = "" + try: + if self._process.stderr is not None: + stderr_text = self._process.stderr.read().decode("utf-8", errors="replace").strip() + except Exception: + stderr_text = "" - # 检查进程是否异常退出 - if process.returncode != 0: - return None + msg = f"worker exited unexpectedly" + if return_code is not None: + msg += f" (returncode={return_code})" + if stderr_text: + msg += f"\nworker stderr:\n{stderr_text}" + return msg - # 解析推理时间和输出(保持原有逻辑) - re.search(r"INFERENCE_TIME:(\d+\.\d+)", stdout.decode('utf-8')) + def __enter__(self): + return self - pattern = r"FLOAT_OUTPUT_START (\d+) (\d+)\n(.*?)\nFLOAT_OUTPUT_END \1" - matches = re.findall(pattern, stdout.decode('utf-8'), re.DOTALL) + def __exit__(self, exc_type, exc, tb): + self.close() + + def _next_request_id(self) -> int: + self._request_id += 1 + return self._request_id + + def _write_request(self, input_arrays: Sequence[np.ndarray]) -> int: + if self._process is None or self._process.stdin is None: + raise RuntimeError("worker process is not running") + if self._process.poll() is not None: + raise RuntimeError(self._worker_exit_message()) + + request_id = self._next_request_id() + header = REQUEST_HEADER_STRUCT.pack( + PROTOCOL_MAGIC, + PROTOCOL_VERSION, + len(input_arrays), + request_id, + 0, + ) + try: + self._process.stdin.write(header) - float_outputs = {} - for idx, count, data_str in matches: - float_data = np.array(list(map(float, data_str.split())), dtype=np.float32) - float_outputs[int(idx)] = float_data + for idx, arr in enumerate(input_arrays): + contiguous = np.ascontiguousarray(arr) + payload = contiguous.tobytes() + entry = INPUT_ENTRY_STRUCT.pack(idx, len(payload), 0) + self._process.stdin.write(entry) + self._process.stdin.write(payload) - return float_outputs + self._process.stdin.flush() + except BrokenPipeError as exc: + raise RuntimeError(self._worker_exit_message()) from exc + return request_id + + def _read_response(self, expected_request_id: int): + if self._process is None or self._process.stdout is None: + raise RuntimeError("worker process is not running") + if self._process.poll() is not None: + raise RuntimeError(self._worker_exit_message()) + + header_bytes = _read_exact(self._process.stdout, RESPONSE_HEADER_STRUCT.size) + ( + magic, + version, + status, + request_id, + output_count, + latency_us, + error_code, + error_msg_size, + ) = RESPONSE_HEADER_STRUCT.unpack(header_bytes) + + if magic != PROTOCOL_MAGIC or version != PROTOCOL_VERSION: + raise RuntimeError( + f"unexpected response header: magic=0x{magic:x}, version={version}" + ) + if request_id != expected_request_id: + raise RuntimeError( + f"mismatched response id: expected {expected_request_id}, got {request_id}" + ) + + outputs = {} + for _ in range(output_count): + entry_bytes = _read_exact(self._process.stdout, OUTPUT_ENTRY_STRUCT.size) + output_index, elem_type, elem_count, byte_size, dim_count, _reserved = OUTPUT_ENTRY_STRUCT.unpack(entry_bytes) + dims = [ + DIM_STRUCT.unpack(_read_exact(self._process.stdout, DIM_STRUCT.size))[0] + for _ in range(dim_count) + ] + payload = _read_exact(self._process.stdout, byte_size) + dtype = _dtype_from_elem_type(elem_type) + data = np.frombuffer(payload, dtype=dtype, count=elem_count) + if dims: + data = data.reshape(tuple(int(dim) for dim in dims)) + outputs[output_index] = data.copy() + + error_msg = "" + if error_msg_size: + error_msg = _read_exact(self._process.stdout, error_msg_size).decode( + "utf-8", errors="replace" + ) + + if status != WORKER_STATUS_OK: + raise RuntimeError( + f"worker inference failed (error_code={error_code}): {error_msg or 'unknown error'}" + ) + + return outputs, latency_us + + def predict(self, batch: Sequence[np.ndarray]) -> np.ndarray: + request_id = self._write_request(batch) + outputs, _latency_us = self._read_response(request_id) + if 2 not in outputs: + raise RuntimeError("worker response does not contain output index 2") + action = outputs[2] + if action.ndim == 2: + action = action[np.newaxis, ...] + return action.astype(np.float32, copy=False) - except BrokenPipeError: - process.kill() - return None - except Exception as e: - process.kill() - return None def read_bin_file(file_path, dtype=np.float32): - """ - 读取bin文件并解析为numpy数组 - Args: - file_path: bin文件路径(如"data.bin") - dtype: 数据类型(如np.float32、np.int64,默认float32) - Returns: - numpy数组:解析后的数值数组 - """ try: - # 以二进制只读模式打开文件 - with open(file_path, 'rb') as f: - # 读取所有二进制数据并解析为numpy数组 - # frombuffer:将二进制字节流转为数组,dtype指定数据类型 + with open(file_path, "rb") as f: data = np.frombuffer(f.read(), dtype=dtype) return data - except FileNotFoundError: print(f"错误:未找到文件 {file_path}") return None @@ -116,21 +260,28 @@ def read_bin_file(file_path, dtype=np.float32): print(f"读取bin文件失败:{str(e)}") return None + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('--image_list', type=str, required=True) + parser.add_argument("--image_list", type=str, required=True) + parser.add_argument( + "--model_path", + type=str, + default="../model/act_distill_fp32_for_mindcmd_simp_release.om", + help="Path to OM model file for worker process", + ) args = parser.parse_args() - image_list = [item for item in args.image_list.split(';')] + image_list = [item for item in args.image_list.split(";")] batch = [] for bin_path in image_list: batch.append(read_bin_file(bin_path)) om_model_path = "../out/main" - model = ACT3403Policy(om_model_path) - action = model.predict(batch)[0] + with ACT3403Policy(om_model_path, args.model_path) as model: + action = model.predict(batch)[0] action = np.around(action, 4) - with open('result.txt', 'w', encoding='utf-8') as f: + with open("result.txt", "w", encoding="utf-8") as f: for num in action: f.write(str(num)) - f.write("\n") \ No newline at end of file + f.write("\n") diff --git a/samples/contribute/ACT/SVP_NNN/src/CMakeLists.txt b/samples/contribute/ACT/SVP_NNN/src/CMakeLists.txt index 85a97703075d0d180697f1d6be75a5db7c5c6d08..25a0101c1d4d0e60661f99b263259b362d691dd1 100755 --- a/samples/contribute/ACT/SVP_NNN/src/CMakeLists.txt +++ b/samples/contribute/ACT/SVP_NNN/src/CMakeLists.txt @@ -18,25 +18,128 @@ set(CMAKE_CXX_FLAGS_DEBUG "-fPIC -O0 -g -Wall") set(CMAKE_CXX_FLAGS_RELEASE "-fPIC -O2 -Wall -s") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g") # 开启调试符号 -set(CMAKE_BUILD_TYPE Debug) # 确保使用调试模式 +if (NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Debug CACHE STRING "Build type" FORCE) +endif() set(CMAKE_SKIP_RPATH TRUE) -# Header path -include_directories( -$ENV{DDK_PATH}/acllib/include/ - ../inc/ +# Locate DDK/Ascend toolkit paths across old and new directory layouts. +set(_ddk_root_candidates + "$ENV{DDK_PATH}" + "$ENV{ASCEND_TOOLKIT_HOME}" + "$ENV{ASCEND_HOME_PATH}" + "/usr/local/Ascend/ascend-toolkit/latest" + "/usr/local/Ascend/ascend-toolkit/6.10.t01spc030b600" ) -# add host lib path +set(DDK_ROOT "") +foreach(_candidate IN LISTS _ddk_root_candidates) + if(_candidate AND EXISTS "${_candidate}") + set(DDK_ROOT "${_candidate}") + break() + endif() +endforeach() + +if(NOT DDK_ROOT) + message(FATAL_ERROR "Cannot find Ascend toolkit root. Please set DDK_PATH or ASCEND_TOOLKIT_HOME.") +endif() + +set(_include_candidates + "${DDK_ROOT}/acllib/include" + "${DDK_ROOT}/x86_64-linux/acllib/include" +) +set(DDK_INCLUDE_DIR "") +foreach(_dir IN LISTS _include_candidates) + if(EXISTS "${_dir}/acl/svp_acl.h") + set(DDK_INCLUDE_DIR "${_dir}") + break() + endif() +endforeach() + +if(NOT DDK_INCLUDE_DIR) + message(FATAL_ERROR "Cannot find svp_acl headers under ${DDK_ROOT}.") +endif() + +set(LIB_PATH "") +set(LIB_PATHS "") if (target MATCHES "Simulator_Function" OR target MATCHES "Simulator_Instruction" OR target MATCHES "Simulator_Performance") - set(ENV{NPU_HOST_LIB} $ENV{DDK_PATH}/toolkit/tools/sim/lib) + set(_sim_candidates + "${DDK_ROOT}/toolkit/tools/sim/lib" + "${DDK_ROOT}/x86_64-linux/toolkit/tools/sim/lib" + ) + foreach(_dir IN LISTS _sim_candidates) + if(EXISTS "${_dir}") + set(LIB_PATH "${_dir}") + list(APPEND LIB_PATHS "${_dir}") + break() + endif() + endforeach() else() - set(ENV{NPU_HOST_LIB} $ENV{DDK_PATH}/acllib/lib64/stub) + set(_stub_dir_candidates "${DDK_ROOT}/acllib/lib64/stub") + file(GLOB _all_stub_dirs "${DDK_ROOT}/x86_64-linux/acllib/lib64_*/stub") + + if(CMAKE_CXX_COMPILER MATCHES "musl") + foreach(_dir IN LISTS _all_stub_dirs) + if(_dir MATCHES "linux-musl/stub$" OR _dir MATCHES "ohos-musl-clang/stub$") + list(APPEND _stub_dir_candidates "${_dir}") + endif() + endforeach() + else() + foreach(_dir IN LISTS _all_stub_dirs) + if(_dir MATCHES "linux-gnu/stub$") + list(APPEND _stub_dir_candidates "${_dir}") + endif() + endforeach() + endif() + + list(APPEND _stub_dir_candidates ${_all_stub_dirs}) + list(REMOVE_DUPLICATES _stub_dir_candidates) + + foreach(_dir IN LISTS _stub_dir_candidates) + if(EXISTS "${_dir}/libsvp_acl.so" + AND EXISTS "${_dir}/libot_osal.so" + AND EXISTS "${_dir}/libot_base.so" + AND EXISTS "${_dir}/libot_sys.so" + AND EXISTS "${_dir}/libot_irq.so" + AND EXISTS "${_dir}/libss_mpi.so" + AND EXISTS "${_dir}/libss_voice_engine.so" + AND EXISTS "${_dir}/libss_upvqe.so" + AND EXISTS "${_dir}/libss_dnvqe.so") + set(LIB_PATH "${_dir}") + break() + endif() + endforeach() + + if(LIB_PATH) + list(APPEND LIB_PATHS "${LIB_PATH}") + endif() + + set(_common_stub_candidates + "${DDK_ROOT}/acllib/lib64/stub" + "${DDK_ROOT}/x86_64-linux/acllib/lib64/stub" + ) + foreach(_dir IN LISTS _common_stub_candidates) + if(EXISTS "${_dir}/libsecurec.so" OR EXISTS "${_dir}/libprotobuf-c.so.1") + list(APPEND LIB_PATHS "${_dir}") + endif() + endforeach() + list(REMOVE_DUPLICATES LIB_PATHS) +endif() + +if(NOT LIB_PATHS) + message(FATAL_ERROR "Cannot find required stub libraries under ${DDK_ROOT}.") endif() -set(LIB_PATH $ENV{NPU_HOST_LIB}) -MESSAGE(NPU_HOST_LIB=$ENV{NPU_HOST_LIB}) -link_directories(${LIB_PATH}) + +include_directories( + ${DDK_INCLUDE_DIR} + ../inc/ +) + +message(STATUS "Using Ascend toolkit root: ${DDK_ROOT}") +message(STATUS "Using Ascend include dir: ${DDK_INCLUDE_DIR}") +message(STATUS "Using Ascend lib dirs: ${LIB_PATHS}") +link_directories(${LIB_PATHS}) FILE(GLOB SAMPLE_SRC_FILE *.cpp) add_executable(main ${SAMPLE_SRC_FILE}) @@ -51,4 +154,3 @@ else() endif() install(TARGETS main DESTINATION ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}) - diff --git a/samples/contribute/ACT/SVP_NNN/src/main.cpp b/samples/contribute/ACT/SVP_NNN/src/main.cpp index 74c8f492b764220a7eea1fade74dd413160b3d45..04ddfd08befaafc870d222402671efdbb16e8140 100755 --- a/samples/contribute/ACT/SVP_NNN/src/main.cpp +++ b/samples/contribute/ACT/SVP_NNN/src/main.cpp @@ -7,86 +7,139 @@ * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. */ -#include #include +#include +#include +#include "acl/svp_acl.h" +#include "protocol.h" #include "sample_process.h" #include "utils.h" -#include -#include + using namespace std; +namespace { + +static const uint16_t kExpectedInputCount = 3; + +void FreeInputBuffers(const vector& input_datas) +{ + for (size_t i = 0; i < input_datas.size(); ++i) { + if (input_datas[i] != nullptr) { + svp_acl_rt_free(const_cast(input_datas[i])); + } + } +} + +} // namespace + int main() { + std::ios::sync_with_stdio(false); + cin.tie(nullptr); + // 初始化推理环境(只执行一次) SampleProcess sample; if (sample.InitResource() != SUCCESS) { - cerr << "Init resource failed" << endl; + ERROR_LOG("Init resource failed"); return -1; } // 加载模型(只执行一次) if (sample.LoadModel() != SUCCESS) { - cerr << "Load model failed" << endl; - sample.DestroyResource(); + ERROR_LOG("Load model failed"); return -1; } - // 循环处理多次输入 + // 循环处理多次请求 while (true) { - vector input_datas; - vector input_sizes; - const int INPUT_COUNT = 3; - - // 读取输入数据(保持原有逻辑) - bool readSuccess = true; - for (int i = 0; i < INPUT_COUNT; ++i) { - uint32_t data_size; - cin.read(reinterpret_cast(&data_size), sizeof(data_size)); - if (!cin.good()) { - cerr << "Read input " << i << " size failed" << endl; - readSuccess = false; + RequestHeader requestHeader; + string protocolError; + const ReadHeaderStatus headerStatus = ReadRequestHeader(cin, requestHeader, protocolError); + if (headerStatus == READ_HEADER_EOF) { + break; + } + if (headerStatus == READ_HEADER_ERROR) { + ERROR_LOG("Read request header failed: %s", protocolError.c_str()); + break; + } + + if (requestHeader.magic != WORKER_PROTOCOL_MAGIC + || requestHeader.version != WORKER_PROTOCOL_VERSION) { + ERROR_LOG( + "Invalid protocol header: magic=0x%x version=%u", + requestHeader.magic, + requestHeader.version); + break; + } + + vector input_datas(kExpectedInputCount, nullptr); + vector input_sizes(kExpectedInputCount, 0U); + bool requestOk = true; + + if (requestHeader.input_count != kExpectedInputCount) { + requestOk = false; + protocolError = "unexpected input_count"; + } + + for (uint16_t i = 0; i < requestHeader.input_count; ++i) { + InputEntryHeader inputHeader; + if (!ReadInputEntryHeader(cin, inputHeader, protocolError)) { + ERROR_LOG("Read input header failed: %s", protocolError.c_str()); + requestOk = false; break; } - void* data = nullptr; - svp_acl_error ret = svp_acl_rt_malloc(&data, data_size, SVP_ACL_MEM_MALLOC_NORMAL_ONLY); - if (ret != SVP_ACL_SUCCESS || data == nullptr) { - cerr << "Malloc buffer for input " << i << " failed" << endl; - readSuccess = false; + if (inputHeader.input_index >= kExpectedInputCount || input_datas[inputHeader.input_index] != nullptr) { + requestOk = false; + protocolError = "invalid or duplicated input_index"; break; } - cin.read(reinterpret_cast(data), data_size); - if (!cin.good()) { - cerr << "Read input " << i << " data failed" << endl; - svp_acl_rt_free(data); - readSuccess = false; - break; + void* data = nullptr; + if (inputHeader.byte_size > 0) { + svp_acl_error ret = svp_acl_rt_malloc(&data, inputHeader.byte_size, SVP_ACL_MEM_MALLOC_NORMAL_ONLY); + if (ret != SVP_ACL_SUCCESS || data == nullptr) { + protocolError = "failed to allocate device buffer for input"; + requestOk = false; + break; + } + + if (!ReadExact(cin, reinterpret_cast(data), inputHeader.byte_size, protocolError)) { + svp_acl_rt_free(data); + data = nullptr; + requestOk = false; + break; + } } - input_datas.push_back(data); - input_sizes.push_back(data_size); + input_datas[inputHeader.input_index] = data; + input_sizes[inputHeader.input_index] = inputHeader.byte_size; } - // 检查是否读取失败(比如到达输入末尾) - if (!readSuccess) { - // 释放已分配的内存 - for (auto ptr : input_datas) svp_acl_rt_free(ptr); - break; - } + if (!requestOk) { + FreeInputBuffers(input_datas); - // 设置输入并执行推理 - sample.SetInputDatas(input_datas, input_sizes); - if (sample.Process() != SUCCESS) { - cerr << "Inference failed" << endl; - } else { - cout << "3-input inference success" << endl; // 注意这里修正了原代码的数字错误(5->3) + InferenceResponse response = MakeErrorResponse( + requestHeader.request_id, + WORKER_ERROR_BAD_REQUEST, + protocolError.empty() ? "bad request frame" : protocolError); + string writeError; + if (!WriteResponseFrame(cout, response, writeError)) { + ERROR_LOG("Write error response failed: %s", writeError.c_str()); + break; + } + continue; } - // 释放当前批次的输入内存 - for (auto data : input_datas) svp_acl_rt_free(data); - } + InferenceResponse response = sample.ProcessOnce( + input_datas, + input_sizes, + requestHeader.request_id); - // 最后释放所有资源 - sample.DestroyResource(); + string writeError; + if (!WriteResponseFrame(cout, response, writeError)) { + ERROR_LOG("Write response failed: %s", writeError.c_str()); + break; + } + } return 0; -} \ No newline at end of file +} diff --git a/samples/contribute/ACT/SVP_NNN/src/model_process.cpp b/samples/contribute/ACT/SVP_NNN/src/model_process.cpp index bfcc99b6bab99b2f80a9ee5516571b587e76c8cf..09df527f6762f5854c559912015363e6399e7bc2 100755 --- a/samples/contribute/ACT/SVP_NNN/src/model_process.cpp +++ b/samples/contribute/ACT/SVP_NNN/src/model_process.cpp @@ -17,34 +17,59 @@ #include "model_process.h" +#include #include #include #include #include #include "utils.h" +#include "worker_types.h" using namespace std; static const int BYTE_BIT_NUM = 8; // 1 byte = 8 bit +namespace { + +uint32_t MapElementType(svp_acl_data_type dataType) +{ + switch (dataType) { + case SVP_ACL_FLOAT: + return WORKER_ELEM_FLOAT32; + case SVP_ACL_FLOAT16: + return WORKER_ELEM_FLOAT16; + case SVP_ACL_INT8: + return WORKER_ELEM_INT8; + case SVP_ACL_UINT8: + return WORKER_ELEM_UINT8; + case SVP_ACL_INT32: + return WORKER_ELEM_INT32; + case SVP_ACL_INT64: + return WORKER_ELEM_INT64; + default: + return WORKER_ELEM_UNKNOWN; + } +} + +} // namespace + ModelProcess::ModelProcess() { } ModelProcess::~ModelProcess() { - Unload(); - DestroyDesc(); - DestroyInput(); - DestroyOutput(); + // Resource ownership is managed explicitly by SampleProcess::DestroyResource(). + // Avoid calling ACL-related cleanup here because SampleProcess finalizes ACL + // before this member destructor runs. } void ModelProcess::DestroyResource() { - Unload(); - DestroyDesc(); DestroyInput(); DestroyOutput(); + DestroyDesc(); + Unload(); } Result ModelProcess::LoadModelFromFileWithMem(const std::string& modelPath) @@ -52,10 +77,45 @@ Result ModelProcess::LoadModelFromFileWithMem(const std::string& modelPath) uint32_t fileSize = 0; modelMemPtr_ = Utils::ReadBinFile(modelPath, fileSize); modelMemSize_ = fileSize; + if (modelMemPtr_ == nullptr || modelMemSize_ == 0) { + ERROR_LOG("read model file failed, model file is %s", modelPath.c_str()); + return FAILED; + } + + svp_acl_mdl_config_handle* handle = svp_acl_mdl_create_config_handle(); + if (handle != nullptr) { + const size_t loadType = SVP_ACL_MDL_LOAD_FROM_MEM; + svp_acl_error cfgRet = svp_acl_mdl_set_config_opt( + handle, SVP_ACL_MDL_LOAD_TYPE_SIZET, &loadType, sizeof(loadType)); + if (cfgRet == SVP_ACL_SUCCESS) { + cfgRet = svp_acl_mdl_set_config_opt( + handle, SVP_ACL_MDL_MEM_ADDR_PTR, &modelMemPtr_, sizeof(modelMemPtr_)); + } + if (cfgRet == SVP_ACL_SUCCESS) { + cfgRet = svp_acl_mdl_set_config_opt( + handle, SVP_ACL_MDL_MEM_SIZET, &modelMemSize_, sizeof(modelMemSize_)); + } + if (cfgRet == SVP_ACL_SUCCESS) { + cfgRet = svp_acl_mdl_load_with_config(handle, &modelId_); + } + (void)svp_acl_mdl_destroy_config_handle(handle); + + if (cfgRet == SVP_ACL_SUCCESS) { + loadFlag_ = true; + INFO_LOG("load model %s success by config(mem), modelId=%u", modelPath.c_str(), modelId_); + return SUCCESS; + } + WARN_LOG("load model by config(mem) failed, ret=%d, fallback to load_from_mem", static_cast(cfgRet)); + } else { + WARN_LOG("create model config handle failed, fallback to load_from_mem"); + } + svp_acl_error ret = svp_acl_mdl_load_from_mem(static_cast(modelMemPtr_), modelMemSize_, &modelId_); if (ret != SVP_ACL_SUCCESS) { svp_acl_rt_free(modelMemPtr_); - ERROR_LOG("load model from file failed, model file is %s", modelPath.c_str()); + modelMemPtr_ = nullptr; + modelMemSize_ = 0; + ERROR_LOG("load model from file failed, model file is %s, ret=%d", modelPath.c_str(), static_cast(ret)); return FAILED; } @@ -285,6 +345,14 @@ size_t ModelProcess::GetInputNum() const { return svp_acl_mdl_get_num_inputs(modelDesc_); } +size_t ModelProcess::GetOutputNum() const +{ + if (modelDesc_ == nullptr) { + return 0; + } + return svp_acl_mdl_get_num_outputs(modelDesc_); +} + // 新增:获取指定索引输入的参数(大小、stride、维度) Result ModelProcess::GetInputStrideParam(int index, size_t& buf_size, size_t& stride, svp_acl_mdl_io_dims& dims) const { if (modelDesc_ == nullptr || index < 0 || static_cast(index) >= GetInputNum()) { @@ -311,7 +379,15 @@ Result ModelProcess::CreateInputFromData(const std::vector& input_d // 初始化输入数据集 if (input_ != nullptr) { DestroyInput(); } input_ = svp_acl_mdl_create_dataset(); - if (input_ == nullptr) { ERROR_LOG("Create input dataset failed"); return FAILED; } + if (input_ == nullptr) { + ERROR_LOG("Create input dataset failed"); + for (size_t i = 0; i < input_datas.size(); ++i) { + if (input_datas[i] != nullptr) { + svp_acl_rt_free(const_cast(input_datas[i])); + } + } + return FAILED; + } // 为每个输入创建缓冲区并绑定数据 for (size_t i = 0; i < input_datas.size(); ++i) { @@ -322,6 +398,22 @@ Result ModelProcess::CreateInputFromData(const std::vector& input_d // 获取当前输入的参数 if (GetInputStrideParam(i, buf_size, stride, dims) != SUCCESS) { ERROR_LOG("Get input %zu param failed", i); + DestroyInput(); + for (size_t rest = i; rest < input_datas.size(); ++rest) { + if (input_datas[rest] != nullptr) { + svp_acl_rt_free(const_cast(input_datas[rest])); + } + } + return FAILED; + } + if (i >= input_sizes.size() || input_sizes[i] > buf_size) { + ERROR_LOG("Input %zu size mismatch, input=%zu expected<=%zu", i, i < input_sizes.size() ? input_sizes[i] : 0U, buf_size); + DestroyInput(); + for (size_t rest = i; rest < input_datas.size(); ++rest) { + if (input_datas[rest] != nullptr) { + svp_acl_rt_free(const_cast(input_datas[rest])); + } + } return FAILED; } @@ -330,6 +422,12 @@ Result ModelProcess::CreateInputFromData(const std::vector& input_d const_cast(input_datas[i]), buf_size, stride); if (input_buf == nullptr) { ERROR_LOG("Create input %zu buffer failed", i); + DestroyInput(); + for (size_t rest = i; rest < input_datas.size(); ++rest) { + if (input_datas[rest] != nullptr) { + svp_acl_rt_free(const_cast(input_datas[rest])); + } + } return FAILED; } @@ -337,6 +435,12 @@ Result ModelProcess::CreateInputFromData(const std::vector& input_d if (svp_acl_mdl_add_dataset_buffer(input_, input_buf) != SVP_ACL_SUCCESS) { ERROR_LOG("Add input %zu buffer to dataset failed", i); svp_acl_destroy_data_buffer(input_buf); + DestroyInput(); + for (size_t rest = i; rest < input_datas.size(); ++rest) { + if (input_datas[rest] != nullptr) { + svp_acl_rt_free(const_cast(input_datas[rest])); + } + } return FAILED; } } @@ -397,6 +501,102 @@ Result ModelProcess::CreateInputFromData(const void* data, size_t data_size) return CreateInput(device_buf, bufSize, stride); } +Result ModelProcess::GetPackedOutputData( + size_t index, + std::vector& packed, + std::vector& dims, + uint32_t& elem_type) const +{ + packed.clear(); + dims.clear(); + elem_type = WORKER_ELEM_UNKNOWN; + + if (output_ == nullptr || modelDesc_ == nullptr) { + ERROR_LOG("output dataset is null"); + return FAILED; + } + if (index >= svp_acl_mdl_get_dataset_num_buffers(output_)) { + ERROR_LOG("output index %zu out of range", index); + return FAILED; + } + + svp_acl_data_buffer* dataBuffer = svp_acl_mdl_get_dataset_buffer(output_, index); + if (dataBuffer == nullptr) { + ERROR_LOG("output[%zu] dataBuffer nullptr invalid", index); + return FAILED; + } + + uint8_t* outData = static_cast(svp_acl_get_data_buffer_addr(dataBuffer)); + size_t outSize = svp_acl_get_data_buffer_size(dataBuffer); + if (outData == nullptr || outSize == 0) { + ERROR_LOG("output[%zu] data invalid, size=%zu", index, outSize); + return FAILED; + } + + size_t bufSize = 0; + size_t stride = 0; + svp_acl_mdl_io_dims ioDims; + if (GetOutputStrideParam(static_cast(index), bufSize, stride, ioDims) != SUCCESS) { + ERROR_LOG("Get output %zu stride param failed", index); + return FAILED; + } + if (ioDims.dim_count == 0) { + ERROR_LOG("output[%zu] dims invalid", index); + return FAILED; + } + + svp_acl_data_type dataType = svp_acl_mdl_get_output_data_type(modelDesc_, index); + const size_t elemByteSize = svp_acl_data_type_size(dataType) / BYTE_BIT_NUM; + if (elemByteSize == 0) { + ERROR_LOG("output[%zu] elem byte size invalid", index); + return FAILED; + } + + elem_type = MapElementType(dataType); + if (elem_type == WORKER_ELEM_UNKNOWN) { + ERROR_LOG("output[%zu] data type unsupported", index); + return FAILED; + } + + dims.reserve(ioDims.dim_count); + uint64_t rowCount = 1; + for (size_t i = 0; i < ioDims.dim_count; ++i) { + dims.push_back(ioDims.dims[i]); + if (i + 1 < ioDims.dim_count) { + rowCount *= static_cast(std::max(ioDims.dims[i], 1)); + } + } + + const uint64_t lastDim = static_cast(std::max(ioDims.dims[ioDims.dim_count - 1], 0)); + const uint64_t rowBytes = lastDim * elemByteSize; + const uint64_t totalBytes = rowCount * rowBytes; + if (totalBytes == 0) { + packed.clear(); + return SUCCESS; + } + if (stride < rowBytes) { + ERROR_LOG("output[%zu] stride(%zu) smaller than rowBytes(%llu)", index, stride, + static_cast(rowBytes)); + return FAILED; + } + + packed.resize(static_cast(totalBytes)); + for (uint64_t row = 0; row < rowCount; ++row) { + const size_t srcOffset = static_cast(row * stride); + const size_t dstOffset = static_cast(row * rowBytes); + if (srcOffset + static_cast(rowBytes) > outSize || dstOffset + static_cast(rowBytes) > packed.size()) { + ERROR_LOG("output[%zu] row copy overflow detected", index); + packed.clear(); + dims.clear(); + elem_type = WORKER_ELEM_UNKNOWN; + return FAILED; + } + std::memcpy(&packed[dstOffset], outData + srcOffset, static_cast(rowBytes)); + } + + return SUCCESS; +} + void ModelProcess::DumpModelOutputResult() const { stringstream ss; @@ -411,48 +611,29 @@ void ModelProcess::DumpModelOutputResult() const } void ModelProcess::OutputModelResult() const { - if (output_ == nullptr) { + if (output_ == nullptr || modelDesc_ == nullptr) { ERROR_LOG("Output dataset is null, cannot output result"); return; } - // 获取输出数量 - size_t outputNum = svp_acl_mdl_get_num_outputs(modelDesc_); - INFO_LOG("Total output count: %zu", outputNum); - - // 遍历每个输出 - for (size_t i = 1; i < outputNum; ++i) { - // 获取当前输出缓冲区 - svp_acl_data_buffer* dataBuffer = svp_acl_mdl_get_dataset_buffer(output_, i); - if (dataBuffer == nullptr) { - ERROR_LOG("Output[%zu] buffer is null", i); - continue; - } - - // 获取输出数据地址和大小 - int8_t* outputData = static_cast(svp_acl_get_data_buffer_addr(dataBuffer)); - size_t outputSize = svp_acl_get_data_buffer_size(dataBuffer); - if (outputData == nullptr || outputSize == 0) { - ERROR_LOG("Output[%zu] data is invalid (size: %zu)", i, outputSize); + INFO_LOG("Total output count: %zu", GetOutputNum()); + for (size_t i = 0; i < GetOutputNum(); ++i) { + std::vector packed; + std::vector dims; + uint32_t elemType = WORKER_ELEM_UNKNOWN; + if (GetPackedOutputData(i, packed, dims, elemType) != SUCCESS) { continue; } - - // 打印当前输出的基本信息 - INFO_LOG("\nOutput[%zu] (size: %zu bytes):", i, outputSize); - INFO_LOG("----------------------------------------"); - - // 打印全部输出数据(根据实际数据类型调整,此处假设为float) - // 注意:需根据模型输出的实际数据类型(如int32_t、float等)修改指针类型 - size_t dataCount = outputSize / sizeof(float); // 假设输出为float类型 - float* floatData = reinterpret_cast(outputData); - - // 打印输出数据的数量 - printf("Total data count: %zu\n", dataCount); - std::cout << "FLOAT_OUTPUT_START " << i << " " << dataCount << std::endl; - for (size_t j = 0; j < dataCount; ++j) { - std::cout << floatData[j] << " "; + std::ostringstream oss; + oss << "Output[" << i << "] elem_type=" << elemType << " dims=["; + for (size_t dimIdx = 0; dimIdx < dims.size(); ++dimIdx) { + if (dimIdx != 0) { + oss << ", "; + } + oss << dims[dimIdx]; } - std::cout << std::endl << "FLOAT_OUTPUT_END " << i << std::endl; + oss << "] bytes=" << packed.size(); + INFO_LOG("%s", oss.str().c_str()); } } @@ -481,7 +662,6 @@ Result ModelProcess::Execute() return FAILED; } executeNum_++; - INFO_LOG("model execute success"); return SUCCESS; } @@ -585,11 +765,6 @@ void ModelProcess::Unload() ERROR_LOG("unload model failed, modelId is %u", modelId_); } - if (modelDesc_ != nullptr) { - (void)svp_acl_mdl_destroy_desc(modelDesc_); - modelDesc_ = nullptr; - } - if (modelMemPtr_ != nullptr) { svp_acl_rt_free(modelMemPtr_); modelMemPtr_ = nullptr; diff --git a/samples/contribute/ACT/SVP_NNN/src/protocol.cpp b/samples/contribute/ACT/SVP_NNN/src/protocol.cpp new file mode 100644 index 0000000000000000000000000000000000000000..95af93bf1d805a0040cb4a8bcbd226df513442f7 --- /dev/null +++ b/samples/contribute/ACT/SVP_NNN/src/protocol.cpp @@ -0,0 +1,205 @@ +#include "protocol.h" + +#include +#include +#include +#include + +namespace { + +bool ReadBytes(std::istream& in, char* data, size_t size, std::string& error_msg) +{ + if (size == 0) { + return true; + } + in.read(data, static_cast(size)); + if (!in.good()) { + error_msg = "failed to read bytes from worker input stream"; + return false; + } + return true; +} + +bool WriteBytes(std::ostream& out, const char* data, size_t size, std::string& error_msg) +{ + if (size == 0) { + return true; + } + out.write(data, static_cast(size)); + if (!out.good()) { + error_msg = "failed to write bytes to worker output stream"; + return false; + } + return true; +} + +bool ReadUint16LE(std::istream& in, uint16_t& value, std::string& error_msg) +{ + unsigned char buf[2]; + if (!ReadBytes(in, reinterpret_cast(buf), sizeof(buf), error_msg)) { + return false; + } + value = static_cast(buf[0]) + | (static_cast(buf[1]) << 8); + return true; +} + +bool ReadUint32LE(std::istream& in, uint32_t& value, std::string& error_msg) +{ + unsigned char buf[4]; + if (!ReadBytes(in, reinterpret_cast(buf), sizeof(buf), error_msg)) { + return false; + } + value = static_cast(buf[0]) + | (static_cast(buf[1]) << 8) + | (static_cast(buf[2]) << 16) + | (static_cast(buf[3]) << 24); + return true; +} + +bool WriteUint16LE(std::ostream& out, uint16_t value, std::string& error_msg) +{ + unsigned char buf[2]; + buf[0] = static_cast(value & 0xFFU); + buf[1] = static_cast((value >> 8) & 0xFFU); + return WriteBytes(out, reinterpret_cast(buf), sizeof(buf), error_msg); +} + +bool WriteUint32LE(std::ostream& out, uint32_t value, std::string& error_msg) +{ + unsigned char buf[4]; + buf[0] = static_cast(value & 0xFFU); + buf[1] = static_cast((value >> 8) & 0xFFU); + buf[2] = static_cast((value >> 16) & 0xFFU); + buf[3] = static_cast((value >> 24) & 0xFFU); + return WriteBytes(out, reinterpret_cast(buf), sizeof(buf), error_msg); +} + +bool WriteInt32LE(std::ostream& out, int32_t value, std::string& error_msg) +{ + return WriteUint32LE(out, static_cast(value), error_msg); +} + +bool WriteUint64LE(std::ostream& out, uint64_t value, std::string& error_msg) +{ + unsigned char buf[8]; + buf[0] = static_cast(value & 0xFFULL); + buf[1] = static_cast((value >> 8) & 0xFFULL); + buf[2] = static_cast((value >> 16) & 0xFFULL); + buf[3] = static_cast((value >> 24) & 0xFFULL); + buf[4] = static_cast((value >> 32) & 0xFFULL); + buf[5] = static_cast((value >> 40) & 0xFFULL); + buf[6] = static_cast((value >> 48) & 0xFFULL); + buf[7] = static_cast((value >> 56) & 0xFFULL); + return WriteBytes(out, reinterpret_cast(buf), sizeof(buf), error_msg); +} + +} // namespace + +ReadHeaderStatus ReadRequestHeader(std::istream& in, RequestHeader& header, std::string& error_msg) +{ + header = RequestHeader(); + + const int first = in.peek(); + if (first == std::char_traits::eof()) { + if (in.eof()) { + return READ_HEADER_EOF; + } + error_msg = "failed to peek worker input stream"; + return READ_HEADER_ERROR; + } + + if (!ReadUint32LE(in, header.magic, error_msg)) { + return READ_HEADER_ERROR; + } + if (!ReadUint16LE(in, header.version, error_msg)) { + return READ_HEADER_ERROR; + } + if (!ReadUint16LE(in, header.input_count, error_msg)) { + return READ_HEADER_ERROR; + } + if (!ReadUint32LE(in, header.request_id, error_msg)) { + return READ_HEADER_ERROR; + } + if (!ReadUint32LE(in, header.flags, error_msg)) { + return READ_HEADER_ERROR; + } + + return READ_HEADER_OK; +} + +bool ReadInputEntryHeader(std::istream& in, InputEntryHeader& header, std::string& error_msg) +{ + header = InputEntryHeader(); + return ReadUint32LE(in, header.input_index, error_msg) + && ReadUint32LE(in, header.byte_size, error_msg) + && ReadUint32LE(in, header.reserved, error_msg); +} + +bool ReadExact(std::istream& in, char* data, size_t size, std::string& error_msg) +{ + return ReadBytes(in, data, size, error_msg); +} + +bool WriteResponseFrame(std::ostream& out, const InferenceResponse& response, std::string& error_msg) +{ + const uint16_t status = response.success ? WORKER_STATUS_OK : WORKER_STATUS_ERROR; + const uint32_t output_count = static_cast(response.outputs.size()); + const uint32_t error_msg_size = static_cast(response.error_msg.size()); + + if (!WriteUint32LE(out, WORKER_PROTOCOL_MAGIC, error_msg) + || !WriteUint16LE(out, WORKER_PROTOCOL_VERSION, error_msg) + || !WriteUint16LE(out, status, error_msg) + || !WriteUint32LE(out, response.request_id, error_msg) + || !WriteUint32LE(out, output_count, error_msg) + || !WriteUint32LE(out, response.latency_us, error_msg) + || !WriteInt32LE(out, response.error_code, error_msg) + || !WriteUint32LE(out, error_msg_size, error_msg)) { + return false; + } + + for (size_t i = 0; i < response.outputs.size(); ++i) { + const WorkerTensor& tensor = response.outputs[i]; + const uint32_t elem_count = tensor.data.empty() ? 0U : static_cast( + tensor.data.size() / (tensor.elem_type == WORKER_ELEM_FLOAT16 ? 2U : + tensor.elem_type == WORKER_ELEM_FLOAT32 ? 4U : + tensor.elem_type == WORKER_ELEM_INT32 ? 4U : + tensor.elem_type == WORKER_ELEM_INT64 ? 8U : 1U)); + const uint32_t byte_size = static_cast(tensor.data.size()); + const uint32_t dim_count = static_cast(tensor.dims.size()); + + if (!WriteUint32LE(out, tensor.output_index, error_msg) + || !WriteUint32LE(out, tensor.elem_type, error_msg) + || !WriteUint32LE(out, elem_count, error_msg) + || !WriteUint32LE(out, byte_size, error_msg) + || !WriteUint32LE(out, dim_count, error_msg) + || !WriteUint32LE(out, 0U, error_msg)) { + return false; + } + + for (size_t dim_idx = 0; dim_idx < tensor.dims.size(); ++dim_idx) { + if (!WriteUint64LE(out, static_cast(tensor.dims[dim_idx]), error_msg)) { + return false; + } + } + + if (!WriteBytes( + out, + reinterpret_cast(tensor.data.data()), + tensor.data.size(), + error_msg)) { + return false; + } + } + + if (!WriteBytes(out, response.error_msg.data(), response.error_msg.size(), error_msg)) { + return false; + } + + out.flush(); + if (!out.good()) { + error_msg = "failed to flush worker output stream"; + return false; + } + return true; +} diff --git a/samples/contribute/ACT/SVP_NNN/src/sample_process.cpp b/samples/contribute/ACT/SVP_NNN/src/sample_process.cpp index 4982bfc375ae8a184d09e1e3f9ca8f2c59b4dedb..b69b8718602ee25c0b34396f4575b28f215c4dea 100755 --- a/samples/contribute/ACT/SVP_NNN/src/sample_process.cpp +++ b/samples/contribute/ACT/SVP_NNN/src/sample_process.cpp @@ -20,24 +20,35 @@ #include "acl/svp_acl.h" #include "utils.h" #include +#include +#include +#include using namespace std; +namespace { + +static const size_t kExpectedInputCount = 3; +static const size_t kActionOutputIndex = 2; + +} // namespace + SampleProcess::SampleProcess() { } SampleProcess::~SampleProcess() { - // 销毁模型资源 - modelProcess_.DestroyResource(); DestroyResource(); } Result SampleProcess::InitResource() { // ACL init - const char* aclConfigPath = "../src/acl.json"; + const char* aclConfigPath = std::getenv("SVP_ACL_CONFIG_PATH"); + if (!(aclConfigPath != nullptr && aclConfigPath[0] != '\0' && access(aclConfigPath, R_OK) == 0)) { + aclConfigPath = nullptr; + } svp_acl_error ret = svp_acl_init(aclConfigPath); if (ret != SVP_ACL_SUCCESS) { ERROR_LOG("acl init failed"); @@ -99,10 +110,15 @@ Result SampleProcess::LoadModel() { return SUCCESS; } - const string omModelPath = "../model/act_distill_fp32_for_mindcmd_simp_release.om"; - Result ret = modelProcess_.LoadModelFromFileWithMem(omModelPath.c_str()); + const auto load_start = std::chrono::high_resolution_clock::now(); + + const char* modelPathEnv = std::getenv("SVP_MODEL_PATH"); + const std::string omModelPath = (modelPathEnv != nullptr && modelPathEnv[0] != '\0') + ? std::string(modelPathEnv) + : "../model/act_distill_fp32_for_mindcmd_simp_release.om"; + Result ret = modelProcess_.LoadModelFromFileWithMem(omModelPath); if (ret != SUCCESS) { - ERROR_LOG("execute LoadModelFromFileWithMem failed"); + ERROR_LOG("execute LoadModelFromFileWithMem failed, model path: %s", omModelPath.c_str()); return FAILED; } @@ -118,53 +134,90 @@ Result SampleProcess::LoadModel() { return FAILED; } + const auto load_end = std::chrono::high_resolution_clock::now(); + const double model_load_ms = std::chrono::duration_cast( + load_end - load_start).count() / 1000.0; + INFO_LOG("[PERF] model_load_ms=%.3f model_path=%s", model_load_ms, omModelPath.c_str()); + isModelLoaded_ = true; return SUCCESS; } // 修改Process方法,只处理单次推理 Result SampleProcess::Process() { - if (!isInited_ || !isModelLoaded_) { - ERROR_LOG("Resource or model not initialized"); - return FAILED; + InferenceResponse response = ProcessOnce(input_datas_, input_sizes_, 0U); + return response.success ? SUCCESS : FAILED; +} + +InferenceResponse SampleProcess::ProcessOnce( + const std::vector& input_datas, + const std::vector& input_sizes, + uint32_t request_id) +{ + if (!isInited_) { + ERROR_LOG("resource not initialized"); + return MakeErrorResponse(request_id, WORKER_ERROR_ACL_NOT_READY, "resource not initialized"); + } + if (!isModelLoaded_) { + ERROR_LOG("model not initialized"); + return MakeErrorResponse(request_id, WORKER_ERROR_MODEL_NOT_READY, "model not initialized"); + } + if (input_datas.size() != kExpectedInputCount || input_sizes.size() != kExpectedInputCount) { + std::ostringstream oss; + oss << "expected " << kExpectedInputCount << " inputs but got " << input_datas.size(); + ERROR_LOG("%s", oss.str().c_str()); + return MakeErrorResponse(request_id, WORKER_ERROR_INPUT_COUNT, oss.str()); } - // 创建输入(使用已加载的模型) - Result ret = modelProcess_.CreateInputFromData(input_datas_, input_sizes_); - if (ret != SUCCESS) { - ERROR_LOG("Create multi-input failed"); - return FAILED; + Result ret = modelProcess_.CreateInputFromData(input_datas, input_sizes); + if (ret != SUCCESS) { + ERROR_LOG("Create multi-input failed"); + return MakeErrorResponse(request_id, WORKER_ERROR_INPUT_SIZE, "Create multi-input failed"); } ret = modelProcess_.CreateTaskBufAndWorkBuf(); if (ret != SUCCESS) { ERROR_LOG("CreateTaskBufAndWorkBuf failed"); - return FAILED; + modelProcess_.DestroyInput(); + return MakeErrorResponse(request_id, WORKER_ERROR_INFERENCE_FAILED, "CreateTaskBufAndWorkBuf failed"); } - // 记录推理开始时间 auto start = std::chrono::high_resolution_clock::now(); ret = modelProcess_.Execute(); if (ret != SUCCESS) { ERROR_LOG("execute inference failed"); modelProcess_.DestroyInput(); - return FAILED; + return MakeErrorResponse(request_id, WORKER_ERROR_INFERENCE_FAILED, "execute inference failed"); } - // 记录推理结束时间 auto end = std::chrono::high_resolution_clock::now(); - double elapsed_ms = std::chrono::duration(end - start).count(); - std::cout << "INFERENCE_TIME:" << elapsed_ms << std::endl; + uint32_t elapsed_us = static_cast( + std::chrono::duration_cast(end - start).count()); + INFO_LOG("[PERF] request_id=%u model_infer_ms=%.3f", request_id, elapsed_us / 1000.0); - // 输出结果 - modelProcess_.OutputModelResult(); - modelProcess_.DumpModelOutputResult(); + WorkerTensor actionTensor; + ret = modelProcess_.GetPackedOutputData( + kActionOutputIndex, + actionTensor.data, + actionTensor.dims, + actionTensor.elem_type); + actionTensor.output_index = static_cast(kActionOutputIndex); - // 释放当前输入缓冲区(保留模型资源) modelProcess_.DestroyInput(); - return SUCCESS; + if (ret != SUCCESS) { + ERROR_LOG("extract output failed"); + return MakeErrorResponse(request_id, WORKER_ERROR_OUTPUT_PARSE_FAILED, "extract output failed"); + } + + InferenceResponse response; + response.request_id = request_id; + response.success = true; + response.latency_us = elapsed_us; + response.error_code = WORKER_ERROR_NONE; + response.outputs.push_back(actionTensor); + return response; } // 新增:保存输入文件路径 @@ -180,6 +233,15 @@ void SampleProcess::SetInputDatas(const std::vector& input_datas, void SampleProcess::DestroyResource() { + if (isModelLoaded_) { + modelProcess_.DestroyResource(); + isModelLoaded_ = false; + } + + if (!isInited_) { + return; + } + svp_acl_error ret; // 1. 先销毁流 if (stream_ != nullptr) { @@ -219,4 +281,6 @@ void SampleProcess::DestroyResource() } } INFO_LOG("end to finalize acl"); + + isInited_ = false; } diff --git a/samples/contribute/ACT/SVP_NNN/src/utils.cpp b/samples/contribute/ACT/SVP_NNN/src/utils.cpp index 70b297d283b3268c5080a004237a6e13686d2f0d..3d7157f585fe9cfb6dfe7f0f3328be87db27222a 100755 --- a/samples/contribute/ACT/SVP_NNN/src/utils.cpp +++ b/samples/contribute/ACT/SVP_NNN/src/utils.cpp @@ -86,15 +86,24 @@ void* Utils::ReadBinFile(const std::string& fileName, uint32_t &fileSize) } binFile.seekg(0, binFile.beg); void* binFileBufferData = nullptr; - svp_acl_error ret = svp_acl_rt_malloc(&binFileBufferData, binFileBufferLen, SVP_ACL_MEM_MALLOC_NORMAL_ONLY); - if (ret != SVP_ACL_SUCCESS) { - ERROR_LOG("malloc device buffer failed. size is %u", binFileBufferLen); + svp_acl_error ret = svp_acl_rt_malloc( + &binFileBufferData, + static_cast(binFileBufferLen), + SVP_ACL_MEM_MALLOC_NORMAL_ONLY); + if (ret != SVP_ACL_SUCCESS || binFileBufferData == nullptr) { + ERROR_LOG("malloc device buffer failed. size is %u, ret=%d", binFileBufferLen, static_cast(ret)); binFile.close(); return nullptr; } - InitData(static_cast(binFileBufferData), binFileBufferLen); + InitData(static_cast(binFileBufferData), static_cast(binFileBufferLen)); binFile.read(static_cast(binFileBufferData), binFileBufferLen); + if (!binFile) { + ERROR_LOG("read file %s into device buffer failed", fileName.c_str()); + svp_acl_rt_free(binFileBufferData); + binFile.close(); + return nullptr; + } binFile.close(); fileSize = static_cast(binFileBufferLen); return binFileBufferData; @@ -135,8 +144,8 @@ void* Utils::ReadBinFileWithStride(const std::string& fileName, const svp_acl_md } size_t bufferSize = loopTimes * stride; svp_acl_error ret = svp_acl_rt_malloc(&binFileBufferData, bufferSize, SVP_ACL_MEM_MALLOC_NORMAL_ONLY); - if (ret != SVP_ACL_SUCCESS) { - ERROR_LOG("malloc device buffer failed. size is %u", binFileBufferLen); + if (ret != SVP_ACL_SUCCESS || binFileBufferData == nullptr) { + ERROR_LOG("malloc device buffer failed. size is %u, ret=%d", binFileBufferLen, static_cast(ret)); binFile.close(); return nullptr; }