diff --git a/tf_adapter/kernels/geop_npu.cc b/tf_adapter/kernels/geop_npu.cc index b01b4ae778dfb96cd80f0d9b4caec944f0a6ad69..4b71ffcb2270449e8513e19c424c6ee1b81809b9 100644 --- a/tf_adapter/kernels/geop_npu.cc +++ b/tf_adapter/kernels/geop_npu.cc @@ -54,6 +54,7 @@ limitations under the License. #include "tensorflow/core/framework/attr_value_util.h" #include "tensorflow/core/framework/node_def_util.h" #include "tensorflow/core/framework/tensor_shape.h" +#include "tensorflow/core/framework/types.h" #include "tensorflow/core/graph/graph.h" #include "tensorflow/core/graph/node_builder.h" #include "tensorflow/core/lib/strings/str_util.h" @@ -76,6 +77,62 @@ namespace tensorflow { Status FunctionalizeControlFlow(Graph *graph, FunctionLibraryDefinition *library); namespace { inline string ToString(ge::Status status) { return ::ge::StatusFactory::Instance()->GetErrDesc(status); } +Status CopyDataToHost(int index, void *tensor_ptr, const size_t total_bytes, + const std::vector &total_outputs, const ge::OutputTensorInfo &output) { + if (output.placement == 1 && output.data == nullptr) { + void *dst_ptr = tensor_ptr; + size_t left_size = total_bytes; + for (auto output_desc : total_outputs) { + void *src_ptr = static_cast(&output_desc); + auto ret = memcpy_s(dst_ptr, sizeof(int64_t), src_ptr, sizeof(int64_t)); + if (ret != EOK) { + ADP_LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << index << ", errret:" << ret + << ", dst_ptr:" << (uintptr_t) dst_ptr << ", dst_size:" << sizeof(int64_t) + << ", src_ptr:" << (uintptr_t) src_ptr << ", src_size:" << sizeof(int64_t); + LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << index << ", errret:" << ret + << ", dst_ptr:" << (uintptr_t) dst_ptr << ", dst_size:" << sizeof(int64_t) + << ", src_ptr:" << (uintptr_t) src_ptr << ", src_size:" << sizeof(int64_t); + return errors::Internal("Outputs mem copy failed, index:", index); + } + left_size -= sizeof(int64_t); + dst_ptr = static_cast(static_cast(dst_ptr) + sizeof(int64_t)); + } + if (left_size != 0) { return errors::Internal("Some outputs are not copy to memory."); } + } else { + void *dst_ptr = tensor_ptr; + void *src_ptr = static_cast(output.data.get()); + size_t left_size = total_bytes; + while (left_size > SECUREC_MEM_MAX_LEN) { + auto ret = memcpy_s(dst_ptr, SECUREC_MEM_MAX_LEN, src_ptr, SECUREC_MEM_MAX_LEN); + if (ret != EOK) { + ADP_LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << index << ", errret:" << ret + << ", dst_ptr:" << (uintptr_t) dst_ptr << ", dst_size:" << SECUREC_MEM_MAX_LEN + << ", src_ptr:" << (uintptr_t) src_ptr << ", src_size:" << SECUREC_MEM_MAX_LEN; + LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << index << ", errret:" << ret + << ", dst_ptr:" << (uintptr_t) dst_ptr << ", dst_size:" << SECUREC_MEM_MAX_LEN + << ", src_ptr:" << (uintptr_t) src_ptr << ", src_size:" << SECUREC_MEM_MAX_LEN; + return errors::InvalidArgument("Outputs mem copy failed, index:", index); + } + left_size -= SECUREC_MEM_MAX_LEN; + + dst_ptr = static_cast(static_cast(dst_ptr) + SECUREC_MEM_MAX_LEN); + src_ptr = static_cast(static_cast(src_ptr) + SECUREC_MEM_MAX_LEN); + } + REQUIRES_NOT_NULL(dst_ptr); + REQUIRES_NOT_NULL(src_ptr); + auto ret = memcpy_s(dst_ptr, left_size, src_ptr, left_size); + if (ret != EOK) { + ADP_LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << index << ", errret:" << ret + << ", dst_ptr:" << (uintptr_t)dst_ptr << ", dst_size:" << left_size + << ", src_ptr:" << (uintptr_t)src_ptr << ", src_size:" << left_size; + LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << index << ", errret:" << ret + << ", dst_ptr:" << (uintptr_t)dst_ptr << ", dst_size:" << left_size + << ", src_ptr:" << (uintptr_t)src_ptr << ", src_size:" << left_size; + return errors::InvalidArgument("Outputs mem copy failed, index:", index); + } + } + return Status::OK(); +} Status BuildOutputTensorInfo(OpKernelContext *ctx, std::vector &outputs) { // ctx is not nullptr int num_outputs = ctx->num_outputs(); @@ -91,9 +148,26 @@ Status BuildOutputTensorInfo(OpKernelContext *ctx, std::vector dims; std::string dim_string; + int64_t shape_size = 1; + std::vector total_outputs; + total_outputs.push_back(output.placement); + total_outputs.push_back(output.dims.size()); + for (int64_t dim : output.dims) { + shape_size *= dim; dims.push_back(dim); dim_string = dim_string + " " + std::to_string(dim); + total_outputs.push_back(dim); + } + if (output.placement == 1) { + dims.clear(); + total_outputs.push_back(output.length); + total_outputs.push_back(reinterpret_cast(output.dev_data)); + ADP_LOG(INFO) << "[GEOP] output length: " << output.length << " output dev_data: " + << reinterpret_cast(output.dev_data); + int64_t dtype_bytes = outputs[i].length / shape_size; + int64_t output_bytes = total_outputs.size() * sizeof(int64_t) / dtype_bytes; + dims.push_back(output_bytes); } TensorShape out_shape(dims); Tensor *tensor = nullptr; @@ -101,7 +175,7 @@ Status BuildOutputTensorInfo(OpKernelContext *ctx, std::vectorTotalBytes(); void *tensor_ptr = DMAHelper::base(tensor); - if (total_bytes != static_cast(output.length)) { + if (output.placement != 1 && total_bytes != static_cast(output.length)) { ADP_LOG(ERROR) << "[GEOP] Outputs len mismatched, index:" << i << ", alloc output:" << total_bytes << ", while GE return:" << output.length; LOG(ERROR) << "[GEOP] Outputs len mismatched, index:" << i << ", alloc output:" << total_bytes @@ -116,40 +190,11 @@ Status BuildOutputTensorInfo(OpKernelContext *ctx, std::vector(output.data.get()); - size_t left_size = total_bytes; - while (left_size > SECUREC_MEM_MAX_LEN) { - REQUIRES_NOT_NULL(dst_ptr); - auto err = memcpy_s(dst_ptr, SECUREC_MEM_MAX_LEN, src_ptr, SECUREC_MEM_MAX_LEN); - if (err != EOK) { - ADP_LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << i << ", errret:" << err - << ", dst_ptr:" << (uintptr_t) dst_ptr << ", dst_size:" << SECUREC_MEM_MAX_LEN - << ", src_ptr:" << (uintptr_t) src_ptr << ", src_size:" << SECUREC_MEM_MAX_LEN; - LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << i << ", errret:" << err - << ", dst_ptr:" << (uintptr_t) dst_ptr << ", dst_size:" << SECUREC_MEM_MAX_LEN - << ", src_ptr:" << (uintptr_t) src_ptr << ", src_size:" << SECUREC_MEM_MAX_LEN; - return errors::InvalidArgument("Outputs mem copy failed, index:", i); - } - left_size -= SECUREC_MEM_MAX_LEN; - - dst_ptr = static_cast(static_cast(dst_ptr) + SECUREC_MEM_MAX_LEN); - src_ptr = static_cast(static_cast(src_ptr) + SECUREC_MEM_MAX_LEN); - } - REQUIRES_NOT_NULL(dst_ptr); - REQUIRES_NOT_NULL(src_ptr); - auto err = memcpy_s(dst_ptr, left_size, src_ptr, left_size); - if (err != EOK) { - ADP_LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << i << ", errret:" << err - << ", dst_ptr:" << (uintptr_t)dst_ptr << ", dst_size:" << left_size - << ", src_ptr:" << (uintptr_t)src_ptr << ", src_size:" << left_size; - LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << i << ", errret:" << err - << ", dst_ptr:" << (uintptr_t)dst_ptr << ", dst_size:" << left_size - << ", src_ptr:" << (uintptr_t)src_ptr << ", src_size:" << left_size; - return errors::InvalidArgument("Outputs mem copy failed, index:", i); - } + Status s = CopyDataToHost(i, tensor_ptr, total_bytes, total_outputs, output); + if (!s.ok()) { + ADP_LOG(ERROR) << s.error_message(); + LOG(ERROR) << s.error_message(); + return s; } } return Status::OK(); @@ -222,6 +267,8 @@ void GeOp::Initialize(OpKernelConstruction *ctx) { OP_REQUIRES_OK(ctx, ctx->GetAttr("_dynamic_graph_execute_mode", &dynamic_graph_execute_mode_)); ctx->GetAttr("_getnext_inputs_shape_range", &getnext_inputs_shape_range_); ctx->GetAttr("_data_inputs_shape_range", &data_inputs_shape_range_); + ctx->GetAttr("_is_dynamic_getnext", &is_dynamic_getnext_); + ctx->GetAttr("_placeholder_index", &placeholder_index_); } ADP_LOG(INFO) << "[GEOP] dynamic_input: " << dynamic_input_ << ", dynamic_graph_execute_mode: " << dynamic_graph_execute_mode_ @@ -460,10 +507,10 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { ADP_LOG(INFO) << "[GEOP] Begin GeOp::ComputeAsync" << ", kernel_name:" << geop_name << ", num_inputs:" << num_inputs << ", num_outputs:" << ctx->num_outputs(); int64 startTime = InferShapeUtil::GetCurrentTimestap(); + std::vector input_vec; std::vector input_shapes; - for (int i = 0; i < ctx->num_inputs(); i++) { - input_shapes.push_back(ctx->input(i).shape().DebugString()); - } + std::vector inputs; + OP_REQUIRES_OK_ASYNC(ctx, (BuildInputTensorInfo(ctx, input_vec, input_shapes, inputs)), done); // if input shapes changed, cache graphs uint32_t cache_graph_id = graph_id_; @@ -499,12 +546,6 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { std::shared_ptr graph = std::make_shared(OpRegistry::Global()); OP_REQUIRES_ASYNC(ctx, graph != nullptr, errors::Internal("create tensorflow graph failed"), done); - std::vector input_vec; - for (uint32_t i = 0; i < num_inputs; i++) { - Tensor input(ctx->input(i)); - input_vec.push_back(input); - } - // Build GraphDef from FunctionDef GraphDef ori_graph_def; OP_REQUIRES_OK_ASYNC(ctx, BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_), done); @@ -747,8 +788,6 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { << ctx->op_kernel().name() << "[ " << (run_end_time - run_start_time) << "us]"; done(); }; - std::vector inputs; - OP_REQUIRES_OK_ASYNC(ctx, (BuildInputTensorInfo(ctx, inputs)), done); ADP_LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name:" << geop_name << " ,tf session: " << tf_session_ << " ,graph id: " << cache_graph_id; @@ -1021,7 +1060,38 @@ void GeOp::SetShapesToOutputDesc(const std::vector &input_shapes, } } -Status GeOp::BuildInputTensorInfo(OpKernelContext *ctx, std::vector &inputs) { +void GeOp::AnalyzeInputDesc(void *tensor_ptr, ge::InputTensorInfo &input, Tensor tensor, + std::vector &input_shapes) { + ADP_LOG(INFO) << "[GEOP] start analyze input tensor."; + input.placement = *static_cast(tensor_ptr); + ADP_LOG(INFO) << "[GEOP] get input placement: " << input.placement; + tensor_ptr = static_cast(static_cast(tensor_ptr) + sizeof(int64_t)); + int64_t rank = *static_cast(tensor_ptr); + ADP_LOG(INFO) << "[GEOP] get input rank: " << rank; + tensor_ptr = static_cast(static_cast(tensor_ptr) + sizeof(int64_t)); + std::vector tmp_dims; + for (int i = 0; i < rank; i++) { + int64_t dim = *static_cast(tensor_ptr); + input.dims.push_back(dim); + tmp_dims.push_back(dim); + tensor_ptr = static_cast(static_cast(tensor_ptr) + sizeof(int64_t)); + ADP_LOG(INFO) << "[GEOP] get input dim: " << dim; + } + TensorShape input_shape(tmp_dims); + Tensor tmp_tensor(tensor.dtype(), input_shape); + ADP_LOG(INFO) << "[GEOP] input shape is: " << tmp_tensor.shape().DebugString(); + input_shapes.push_back(tmp_tensor.shape().DebugString()); + input.length = *static_cast(tensor_ptr); + ADP_LOG(INFO) << "[GEOP] get input length: " << input.length; + tensor_ptr = static_cast(static_cast(tensor_ptr) + sizeof(int64_t)); + input.data = reinterpret_cast(*static_cast(tensor_ptr)); + ADP_LOG(INFO) << "[GEOP] get input data addr: " << reinterpret_cast(input.data); +} + +Status GeOp::BuildInputTensorInfo(OpKernelContext *ctx, + std::vector &input_vec, + std::vector &input_shapes, + std::vector &inputs) { // ctx is not nullptr int num_inputs = ctx->num_inputs(); @@ -1030,7 +1100,6 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *ctx, std::vectorinput(i)); ADP_LOG(INFO) << "[GEOP] Input tensor " << i << " shape: " << tensor.shape().DebugString(); DataType data_type = tensor.dtype(); - size_t total_bytes = tensor.TotalBytes(); void *tensor_ptr = DMAHelper::base(&tensor); ge::InputTensorInfo input; @@ -1043,12 +1112,17 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *ctx, std::vector(type); input.dims.clear(); - for (uint32_t dim : tensor.shape().dim_sizes()) { input.dims.push_back(static_cast(dim)); } - input.data = tensor_ptr; - input.length = static_cast(total_bytes); - + if (is_dynamic_getnext_ == "1" && (placeholder_index_.find(std::to_string(i)) == std::string::npos)) { + AnalyzeInputDesc(tensor_ptr, input, tensor, input_shapes); + } else { + for (uint32_t dim : tensor.shape().dim_sizes()) { input.dims.push_back(static_cast(dim)); } + input.data = tensor_ptr; + input.length = static_cast(tensor.TotalBytes()); + input_shapes.push_back(tensor.shape().DebugString()); + } + input.data_type = static_cast(type); + input_vec.push_back(tensor); inputs.push_back(input); } return Status::OK(); diff --git a/tf_adapter/kernels/geop_npu.h b/tf_adapter/kernels/geop_npu.h index 23c307606f59e440d20ca4d6493521902682e1fc..31803f26a37186d1a95693da3bbbc4a2fd03cd7e 100644 --- a/tf_adapter/kernels/geop_npu.h +++ b/tf_adapter/kernels/geop_npu.h @@ -63,7 +63,10 @@ class GeOp : public AsyncOpKernel { GraphDef &graph_def, bool &is_initialize); // prepare input tensor - Status BuildInputTensorInfo(OpKernelContext *ctx, std::vector &inputs); + Status BuildInputTensorInfo(OpKernelContext *ctx, + std::vector &input_vec, + std::vector &input_shapes, + std::vector &inputs); // prepare output tensor Status BuildOutTensorInfo(OpKernelContext *ctx); @@ -96,6 +99,9 @@ class GeOp : public AsyncOpKernel { Status ChangeInputsShapeDesc(); + void AnalyzeInputDesc(void *tensor_ptr, ge::InputTensorInfo &input, Tensor tensor, + std::vector &input_shapes); + private: static const std::string INPUT_DESC; static const std::string OUTPUT_DESC; @@ -141,6 +147,8 @@ class GeOp : public AsyncOpKernel { std::string data_inputs_shape_range_; std::string getnext_inputs_shape_range_; bool need_compile_graph_first_; + std::string is_dynamic_getnext_; + std::string placeholder_index_; }; } // namespace tensorflow #endif // TENSORFLOW_KERNELS_GEOP_NPU_H_ diff --git a/tf_adapter/optimizers/om_partition_subgraphs_pass.cc b/tf_adapter/optimizers/om_partition_subgraphs_pass.cc index be10980f1f994343d49c132615f2e2f0a63708be..7778b1f05d03ed2f5014f3c1b7dd93fbf2511f31 100644 --- a/tf_adapter/optimizers/om_partition_subgraphs_pass.cc +++ b/tf_adapter/optimizers/om_partition_subgraphs_pass.cc @@ -661,11 +661,14 @@ std::vector string_split(const string &str, const string &pattern) { } Status MarkForPartition(std::unique_ptr *graphIn, int &clusterNum, bool mix_compile_mode, int graph_num, - FunctionLibraryDefinition *func_lib, std::map pass_options) { + FunctionLibraryDefinition *func_lib, std::map pass_options, + std::map &graph_options) { Graph *graph = graphIn->get(); - bool enableDP = pass_options["enable_dp"] == "1"; + bool enable_dp = pass_options["enable_dp"] == "1"; + bool is_set_lazy_recompile = graph_options["dynamic_input"] == "1" && + graph_options["dynamic_graph_execute_mode"] == "lazy_recompile"; OrderedNodeSet npuSupportCandidates; - TF_RETURN_IF_ERROR(FindNpuSupportCandidates(*graph, &npuSupportCandidates, func_lib, enableDP, mix_compile_mode)); + TF_RETURN_IF_ERROR(FindNpuSupportCandidates(*graph, &npuSupportCandidates, func_lib, enable_dp, mix_compile_mode)); TF_RETURN_IF_ERROR(AddRelationalConst(*graph, &npuSupportCandidates)); std::map> cluster_map; @@ -715,6 +718,10 @@ Status MarkForPartition(std::unique_ptr *graphIn, int &clusterNum, bool m REQUIRES_NOT_NULL(src); REQUIRES_NOT_NULL(dst); if (!src->IsOp() || !dst->IsOp()) { continue; } + if (is_set_lazy_recompile && (src->type_string() == "IteratorGetNext") && enable_dp) { + graph_options["is_dynamic_getnext"] = "1"; + continue; + } int src_index = cluster_map[src]->index; int dst_index = cluster_map[dst]->index; @@ -905,7 +912,7 @@ Status MarkForPartition(std::unique_ptr *graphIn, int &clusterNum, bool m } } if (clusterNum > 1) { - if (mix_compile_mode) { + if (mix_compile_mode || is_set_lazy_recompile) { TF_RETURN_IF_ERROR(MergeSubgraphsInNewWay(sortedCluster, npuSupportCandidates, clusterToMerge)); } else { TF_RETURN_IF_ERROR(MergeSubgraphs(sortedCluster, npuSupportCandidates, clusterToMerge)); @@ -1251,6 +1258,9 @@ Status OMSplitter::Subgraph::RecordArg(const Edge *edge, const std::unordered_ma args_.push_back(arg); argDatetypes_.push_back(dtype); } + if (srcNode->name().find("_arg_") != std::string::npos) { + graph_options_["placeholder_index"] += std::to_string(argIndex); + } Node *dstNode = edge->dst(); Node *dstImage = nodeImages.at(dstNode); int dstSlot = edge->dst_input(); @@ -1971,7 +1981,8 @@ Status OMPartitionSubgraphsPass::ProcessGraph(std::unique_ptr *graph, Fun int subgraphNum = 0; TF_RETURN_IF_ERROR( - OMSplitter::MarkForPartition(graph, subgraphNum, mix_compile_mode, graph_num, func_lib, pass_options)); + OMSplitter::MarkForPartition(graph, subgraphNum, mix_compile_mode, graph_num, + func_lib, pass_options, graph_options)); ADP_LOG(INFO) << "OMPartition subgraph_" << std::to_string(graph_num) << " markForPartition success."; if (subgraphNum < 1) { ADP_LOG(INFO) << "subgraphNum is " << subgraphNum; diff --git a/tf_adapter/optimizers/om_partition_subgraphs_pass.h b/tf_adapter/optimizers/om_partition_subgraphs_pass.h index 7a72061bd66fbfa6a2fa8e95ee740a0ad393db77..a08f4af946ada796e8aa5a96017f24bba3fdc569 100644 --- a/tf_adapter/optimizers/om_partition_subgraphs_pass.h +++ b/tf_adapter/optimizers/om_partition_subgraphs_pass.h @@ -37,7 +37,8 @@ namespace tensorflow { namespace OMSplitter { Status MarkForPartition(const GraphOptimizationPassOptions &options, int &clusterNum, bool mix_compile_mode, int graph_num, FunctionLibraryDefinition *func_lib, - std::map pass_options); + std::map pass_options, + std::map &graph_options); // Transformation that finds subgraphs whose nodes are marked with // 'groupAttribute', splits those subgraphs into functions, and replaces