From e1ac6250cd397fb4d107a0afe95b6883beccdd01 Mon Sep 17 00:00:00 2001 From: huanruizhi Date: Wed, 6 Jan 2021 15:52:44 +0800 Subject: [PATCH] lazy recompile mode func && DynamicGetNext --- tf_adapter/kernels/geop_npu.cc | 166 +++++++++++++----- tf_adapter/kernels/geop_npu.h | 10 +- .../optimizers/om_partition_subgraphs_pass.cc | 27 ++- .../optimizers/om_partition_subgraphs_pass.h | 3 +- 4 files changed, 150 insertions(+), 56 deletions(-) diff --git a/tf_adapter/kernels/geop_npu.cc b/tf_adapter/kernels/geop_npu.cc index 692dfa3ab..4448b7e66 100644 --- a/tf_adapter/kernels/geop_npu.cc +++ b/tf_adapter/kernels/geop_npu.cc @@ -53,6 +53,7 @@ limitations under the License. #include "tensorflow/core/framework/attr_value_util.h" #include "tensorflow/core/framework/node_def_util.h" #include "tensorflow/core/framework/tensor_shape.h" +#include "tensorflow/core/framework/types.h" #include "tensorflow/core/graph/graph.h" #include "tensorflow/core/graph/node_builder.h" #include "tensorflow/core/lib/strings/str_util.h" @@ -75,6 +76,53 @@ namespace tensorflow { Status FunctionalizeControlFlow(Graph *graph, FunctionLibraryDefinition *library); namespace { inline string ToString(ge::Status status) { return ::ge::StatusFactory::Instance()->GetErrDesc(status); } +Status CopyDataToHost(int index, void *tensor_ptr, const size_t total_bytes, + const std::vector &total_outputs, const ge::OutputTensorInfo &output) { + if (output.placement == 1 && output.data == nullptr) { + void *dst_ptr = tensor_ptr; + size_t left_size = total_bytes; + for (auto output_desc : total_outputs) { + void *src_ptr = static_cast(&output_desc); + auto ret = memcpy_s(dst_ptr, sizeof(int64_t), src_ptr, sizeof(int64_t)); + if (err != EOK) { + LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << index << ", errret:" << ret + << ", dst_ptr:" << (uintptr_t) dst_ptr << ", dst_size:" << sizeof(int64_t) + << ", src_ptr:" << (uintptr_t) src_ptr << ", src_size:" << sizeof(int64_t); + return errors::Internal("Outputs mem copy failed, index:", index); + } + left_size -= sizeof(int64_t); + dst_ptr = static_cast(static_cast(dst_ptr) + sizeof(int64_t)); + } + if (left_size != 0) { return errors::Internal("Some outputs are not copy to memory."); } + } else { + void *dst_ptr = tensor_ptr; + void *src_ptr = static_cast(output.data.get()); + size_t left_size = total_bytes; + while (left_size > SECUREC_MEM_MAX_LEN) { + auto err = memcpy_s(dst_ptr, SECUREC_MEM_MAX_LEN, src_ptr, SECUREC_MEM_MAX_LEN); + if (err != EOK) { + LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << i << ", errret:" << err + << ", dst_ptr:" << (uintptr_t) dst_ptr << ", dst_size:" << SECUREC_MEM_MAX_LEN + << ", src_ptr:" << (uintptr_t) src_ptr << ", src_size:" << SECUREC_MEM_MAX_LEN; + return errors::InvalidArgument("Outputs mem copy failed, index:", i); + } + left_size -= SECUREC_MEM_MAX_LEN; + + dst_ptr = static_cast(static_cast(dst_ptr) + SECUREC_MEM_MAX_LEN); + src_ptr = static_cast(static_cast(src_ptr) + SECUREC_MEM_MAX_LEN); + } + REQUIRES_NOT_NULL(dst_ptr); + REQUIRES_NOT_NULL(src_ptr); + auto err = memcpy_s(dst_ptr, left_size, src_ptr, left_size); + if (err != EOK) { + LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << i << ", errret:" << err + << ", dst_ptr:" << (uintptr_t)dst_ptr << ", dst_size:" << left_size + << ", src_ptr:" << (uintptr_t)src_ptr << ", src_size:" << left_size; + return errors::InvalidArgument("Outputs mem copy failed, index:", i); + } + } + return Status::OK(); +} Status BuildOutputTensorInfo(OpKernelContext *ctx, std::vector &outputs) { // ctx is not nullptr int num_outputs = ctx->num_outputs(); @@ -89,9 +137,26 @@ Status BuildOutputTensorInfo(OpKernelContext *ctx, std::vector dims; std::string dim_string; + int64_t shape_size = 1; + std::vector total_outputs; + total_outputs.push_back(output.placement); + total_outputs.push_back(output.dims.size()); + for (int64_t dim : output.dims) { + shape_size *= dim; dims.push_back(dim); dim_string = dim_string + " " + std::to_string(dim); + total_outputs.push_back(dim); + } + if (output.placement == 1) { + dims.clear(); + total_outputs.push_back(output.length); + total_outputs.push_back(reinterpret_cast(output.dev_data)); + LOG(INFO) << "[GEOP] output length: " << output.length << " output dev_data: " + << reinterpret_cast(output.dev_data); + int64_t dtype_bytes = outputs[i].length / shape_size; + int64_t output_bytes = total_outputs.size() * sizeof(int64_t) / dtype_bytes; + dims.push_back(output_bytes); } TensorShape out_shape(dims); Tensor *tensor = nullptr; @@ -99,7 +164,7 @@ Status BuildOutputTensorInfo(OpKernelContext *ctx, std::vectorTotalBytes(); void *tensor_ptr = DMAHelper::base(tensor); - if (total_bytes != static_cast(output.length)) { + if (output.placement != 1 && total_bytes != static_cast(output.length)) { LOG(ERROR) << "[GEOP] Outputs len mismatched, index:" << i << ", alloc output:" << total_bytes << ", while GE return:" << output.length; return errors::InvalidArgument("Outputs num mismatched, index:", i, ", alloc output:", total_bytes, @@ -112,33 +177,10 @@ Status BuildOutputTensorInfo(OpKernelContext *ctx, std::vector(output.data.get()); - size_t left_size = total_bytes; - while (left_size > SECUREC_MEM_MAX_LEN) { - auto err = memcpy_s(dst_ptr, SECUREC_MEM_MAX_LEN, src_ptr, SECUREC_MEM_MAX_LEN); - if (err != EOK) { - LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << i << ", errret:" << err - << ", dst_ptr:" << (uintptr_t) dst_ptr << ", dst_size:" << SECUREC_MEM_MAX_LEN - << ", src_ptr:" << (uintptr_t) src_ptr << ", src_size:" << SECUREC_MEM_MAX_LEN; - return errors::InvalidArgument("Outputs mem copy failed, index:", i); - } - left_size -= SECUREC_MEM_MAX_LEN; - - dst_ptr = static_cast(static_cast(dst_ptr) + SECUREC_MEM_MAX_LEN); - src_ptr = static_cast(static_cast(src_ptr) + SECUREC_MEM_MAX_LEN); - } - REQUIRES_NOT_NULL(dst_ptr); - REQUIRES_NOT_NULL(src_ptr); - auto err = memcpy_s(dst_ptr, left_size, src_ptr, left_size); - if (err != EOK) { - LOG(ERROR) << "[GEOP] Outputs mem copy failed, index:" << i << ", errret:" << err - << ", dst_ptr:" << (uintptr_t)dst_ptr << ", dst_size:" << left_size - << ", src_ptr:" << (uintptr_t)src_ptr << ", src_size:" << left_size; - return errors::InvalidArgument("Outputs mem copy failed, index:", i); - } + Status s = CopyDataToHost(i, tensor_ptr, total_bytes, total_outputs, output); + if (!s.ok()) { + LOG(ERROR) << s.error_message(); + return s; } } return Status::OK(); @@ -208,10 +250,14 @@ void GeOp::Initialize(OpKernelConstruction *ctx) { OP_REQUIRES_OK(ctx, ctx->GetAttr("_dynamic_graph_execute_mode", &dynamic_graph_execute_mode_)); ctx->GetAttr("_getnext_inputs_shape_range", &getnext_inputs_shape_range_); ctx->GetAttr("_data_inputs_shape_range", &data_inputs_shape_range_); + ctx->GetAttr("_is_dynamic_getnext", &is_dynamic_getnext_); + ctx->GetAttr("_placeholder_index", &placeholder_index_); LOG(INFO) << "[GEOP] dynamic_input: " << dynamic_input_ << ", dynamic_graph_execute_mode: " << dynamic_graph_execute_mode_ << ", getnext_inputs_shape_range: " << getnext_inputs_shape_range_ - << ", data_inputs_shape_range: " << data_inputs_shape_range_; + << ", data_inputs_shape_range: " << data_inputs_shape_range_ + << ", is_dynamic_getnext: " << is_dynamic_getnext_ + << ", placeholder_index: " << placeholder_index_; } // global environment Initialize, invoke once for each process @@ -436,10 +482,10 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { LOG(INFO) << "[GEOP] Begin GeOp::ComputeAsync" << ", kernel_name:" << geop_name << ", num_inputs:" << num_inputs << ", num_outputs:" << ctx->num_outputs(); int64 startTime = InferShapeUtil::GetCurrentTimestap(); + std::vector input_vec; std::vector input_shapes; - for (int i = 0; i < ctx->num_inputs(); i++) { - input_shapes.push_back(ctx->input(i).shape().DebugString()); - } + std::vector inputs; + OP_REQUIRES_OK_ASYNC(ctx, (BuildInputTensorInfo(ctx, input_vec, input_shapes, inputs)), done); // if input shapes changed, cache graphs uint32_t cache_graph_id = graph_id_; @@ -474,12 +520,6 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { std::shared_ptr graph = std::make_shared(OpRegistry::Global()); OP_REQUIRES_ASYNC(ctx, graph != nullptr, errors::Internal("create tensorflow graph failed"), done); - std::vector input_vec; - for (uint32_t i = 0; i < num_inputs; i++) { - Tensor input(ctx->input(i)); - input_vec.push_back(input); - } - // Build GraphDef from FunctionDef GraphDef ori_graph_def; OP_REQUIRES_OK_ASYNC(ctx, BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_), done); @@ -700,8 +740,6 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { << "[ " << (run_end_time - run_start_time) << "us]"; done(); }; - std::vector inputs; - OP_REQUIRES_OK_ASYNC(ctx, (BuildInputTensorInfo(ctx, inputs)), done); LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name:" << geop_name << " ,tf session: " << tf_session_ << " ,graph id: " << cache_graph_id; @@ -959,7 +997,38 @@ void GeOp::SetShapesToOutputDesc(const std::vector &input_shapes, } } -Status GeOp::BuildInputTensorInfo(OpKernelContext *ctx, std::vector &inputs) { +void GeOp::AnalyzeInputDesc(void *tensor_ptr, ge::InputTensorInfo &input, + std::vector &input_shapes) { + LOG(INFO) << "[GEOP] start analyze input tensor."; + input.placement = *static_cast(tensor_ptr); + LOG(INFO) << "[GEOP] get input placement: " << input.placement; + tensor_ptr = static_cast(static_cast(tensor_ptr) + sizeof(int64_t)); + int64_t rank = *static_cast(tensor_ptr); + LOG(INFO) << "[GEOP] get input rank: " << rank; + tensor_ptr = static_cast(static_cast(tensor_ptr) + sizeof(int64_t)); + std::vector tmp_dims; + for (int i = 0; i < rank; i++) { + int64_t dim = *static_cast(tensor_ptr); + input.dims.push_back(dim); + tmp_dims.push_back(dim); + tensor_ptr = static_cast(static_cast(tensor_ptr) + sizeof(int64_t)); + LOG(INFO) << "[GEOP] get input dim: " << dim; + } + TensorShape input_shape(tmp_dims); + Tensor tmp_tensor(tensor.dtype(), input_shape); + LOG(INFO) << "[GEOP] input shape is: " << tmp_tensor.shape().DebugString(); + input_shapes.push_back(tmp_tensor.shape().DebugString()); + input.length = *static_cast(tensor_ptr); + LOG(INFO) << "[GEOP] get input length: " << input.length; + tensor_ptr = static_cast(static_cast(tensor_ptr) + sizeof(int64_t)); + input.data = reinterpret_cast(*static_cast(tensor_ptr)); + LOG(INFO) << "[GEOP] get input data addr: " << reinterpret_cast(input.data); +} + +Status GeOp::BuildInputTensorInfo(OpKernelContext *ctx, + std::vector &input_vec, + std::vector &input_shapes, + std::vector &inputs) { // ctx is not nullptr int num_inputs = ctx->num_inputs(); @@ -968,7 +1037,6 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *ctx, std::vectorinput(i)); LOG(INFO) << "[GEOP] Input tensor " << i << " shape: " << tensor.shape().DebugString(); DataType data_type = tensor.dtype(); - size_t total_bytes = tensor.TotalBytes(); void *tensor_ptr = DMAHelper::base(&tensor); ge::InputTensorInfo input; @@ -980,12 +1048,18 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *ctx, std::vector(type); input.dims.clear(); - for (uint32_t dim : tensor.shape().dim_sizes()) { input.dims.push_back(static_cast(dim)); } - input.data = tensor_ptr; - input.length = static_cast(total_bytes); + if (is_dynamic_getnext_ == "1" && (placeholder_index_.find(std::to_string(i)) == std::string::npos)) { + AnalyzeInputDesc(tensor_ptr, input, input_shapes); + } else { + for (uint32_t dim : tensor.shape().dim_sizes()) { input.dims.push_back(static_cast(dim)); } + input.data = tensor_ptr; + input.length = static_cast(total_bytes); + input_shapes.push_back(tensor.shape.DebugString()); + } + input.data_type = static_cast(type); + input_vec.push_back(tensor); inputs.push_back(input); } return Status::OK(); diff --git a/tf_adapter/kernels/geop_npu.h b/tf_adapter/kernels/geop_npu.h index 85d725e18..1012f4e0e 100644 --- a/tf_adapter/kernels/geop_npu.h +++ b/tf_adapter/kernels/geop_npu.h @@ -63,7 +63,10 @@ class GeOp : public AsyncOpKernel { GraphDef &graph_def, bool &is_initialize); // prepare input tensor - Status BuildInputTensorInfo(OpKernelContext *ctx, std::vector &inputs); + Status BuildInputTensorInfo(OpKernelContext *ctx, + std::vector &input_vec + std::vector &input_shapes, + std::vector &inputs); // prepare output tensor Status BuildOutTensorInfo(OpKernelContext *ctx); @@ -96,6 +99,9 @@ class GeOp : public AsyncOpKernel { Status ChangeInputsShapeDesc(); + void AnalyzeInputDesc(void *tensor_ptr, ge::InputTensorInfo &input, + std::vector &input_shapes); + private: static const std::string INPUT_DESC; static const std::string OUTPUT_DESC; @@ -140,6 +146,8 @@ class GeOp : public AsyncOpKernel { std::string dynamic_graph_execute_mode_; std::string data_inputs_shape_range_; std::string getnext_inputs_shape_range_; + std::string is_dynamic_getnext_; + std::string placeholder_index_; }; } // namespace tensorflow #endif // TENSORFLOW_KERNELS_GEOP_NPU_H_ diff --git a/tf_adapter/optimizers/om_partition_subgraphs_pass.cc b/tf_adapter/optimizers/om_partition_subgraphs_pass.cc index d3375957e..a7d8d6d80 100644 --- a/tf_adapter/optimizers/om_partition_subgraphs_pass.cc +++ b/tf_adapter/optimizers/om_partition_subgraphs_pass.cc @@ -322,7 +322,7 @@ bool IsNpuSupportingNode(Node *node, bool mix_compile_mode, FunctionLibraryDefin } Status FindNpuSupportCandidates(const Graph &graph, OrderedNodeSet *candidates, FunctionLibraryDefinition *func_lib, - bool enableDP, bool mix_compile_mode) { + bool enable_dp, bool mix_compile_mode) { int64 startTime = InferShapeUtil::GetCurrentTimestap(); compile_mode = mix_compile_mode; std::vector sortedNodes; @@ -351,7 +351,7 @@ Status FindNpuSupportCandidates(const Graph &graph, OrderedNodeSet *candidates, } std::sort(sortedNodes.begin(), sortedNodes.end(), NodeCompare()); - LOG(INFO) << "FindNpuSupportCandidates enableDP:" << enableDP << ", mix_compile_mode: " << compile_mode + LOG(INFO) << "FindNpuSupportCandidates enable_dp:" << enable_dp << ", mix_compile_mode: " << compile_mode << ", hasMakeIteratorOp:" << hasMakeIteratorOp << ", hasIteratorOp:" << hasIteratorOp; if (hasMakeIteratorOp && hasIteratorOp) { @@ -367,7 +367,7 @@ Status FindNpuSupportCandidates(const Graph &graph, OrderedNodeSet *candidates, if (!node->IsOp()) { // Ship Sink/Source nodes. continue; } - if (enableDP + if (enable_dp && (node->type_string() == "Iterator" || node->type_string() == "IteratorV2" || node->type_string() == "IteratorGetNext")) { if (node->type_string() == "IteratorGetNext") { @@ -657,11 +657,14 @@ std::vector string_split(const string &str, const string &pattern) { } Status MarkForPartition(std::unique_ptr *graphIn, int &clusterNum, bool mix_compile_mode, int graph_num, - FunctionLibraryDefinition *func_lib, std::map pass_options) { + FunctionLibraryDefinition *func_lib, std::map pass_options, + std::map &graph_options) { Graph *graph = graphIn->get(); - bool enableDP = pass_options["enable_dp"] == "1"; + bool enable_dp = pass_options["enable_dp"] == "1"; + bool is_set_lazy_recompile = graph_options["dynamic_input"] == "1" && + graph_options["dynamic_graph_execute_mode"] == "lazy_recompile"; OrderedNodeSet npuSupportCandidates; - TF_RETURN_IF_ERROR(FindNpuSupportCandidates(*graph, &npuSupportCandidates, func_lib, enableDP, mix_compile_mode)); + TF_RETURN_IF_ERROR(FindNpuSupportCandidates(*graph, &npuSupportCandidates, func_lib, enable_dp, mix_compile_mode)); TF_RETURN_IF_ERROR(AddRelationalConst(*graph, &npuSupportCandidates)); std::map> cluster_map; @@ -710,6 +713,10 @@ Status MarkForPartition(std::unique_ptr *graphIn, int &clusterNum, bool m REQUIRES_NOT_NULL(src); REQUIRES_NOT_NULL(dst); if (!src->IsOp() || !dst->IsOp()) { continue; } + if (is_set_lazy_recompile && (src->type_string() == "IteratorGetNext") && enable_dp) { + graph_options["is_dynamic_getnext"] = "1"; + continue; + } int src_index = cluster_map[src]->index; int dst_index = cluster_map[dst]->index; @@ -900,7 +907,7 @@ Status MarkForPartition(std::unique_ptr *graphIn, int &clusterNum, bool m } } if (clusterNum > 1) { - if (mix_compile_mode) { + if (mix_compile_mode || is_set_lazy_recompile) { TF_RETURN_IF_ERROR(MergeSubgraphsInNewWay(sortedCluster, npuSupportCandidates, clusterToMerge)); } else { TF_RETURN_IF_ERROR(MergeSubgraphs(sortedCluster, npuSupportCandidates, clusterToMerge)); @@ -1243,6 +1250,9 @@ Status OMSplitter::Subgraph::RecordArg(const Edge *edge, const std::unordered_ma args_.push_back(arg); argDatetypes_.push_back(dtype); } + if (srcNode->name().find("_arg_") != std::string::npos) { + graph_options_["placeholder_index"] += std::to_string(argIndex); + } Node *dstNode = edge->dst(); Node *dstImage = nodeImages.at(dstNode); int dstSlot = edge->dst_input(); @@ -1943,7 +1953,8 @@ Status OMPartitionSubgraphsPass::ProcessGraph(std::unique_ptr *graph, Fun int subgraphNum = 0; TF_RETURN_IF_ERROR( - OMSplitter::MarkForPartition(graph, subgraphNum, mix_compile_mode, graph_num, func_lib, pass_options)); + OMSplitter::MarkForPartition(graph, subgraphNum, mix_compile_mode, graph_num, + func_lib, pass_options, graph_options)); LOG(INFO) << "OMPartition subgraph_" << std::to_string(graph_num) << " markForPartition success."; if (subgraphNum < 1) { LOG(INFO) << "subgraphNum is " << subgraphNum; diff --git a/tf_adapter/optimizers/om_partition_subgraphs_pass.h b/tf_adapter/optimizers/om_partition_subgraphs_pass.h index 6c91e3ba3..6239f81b3 100644 --- a/tf_adapter/optimizers/om_partition_subgraphs_pass.h +++ b/tf_adapter/optimizers/om_partition_subgraphs_pass.h @@ -37,7 +37,8 @@ namespace tensorflow { namespace OMSplitter { Status MarkForPartition(const GraphOptimizationPassOptions &options, int &clusterNum, bool mix_compile_mode, int graph_num, FunctionLibraryDefinition *func_lib, - std::map pass_options); + std::map pass_options, + std::map &graph_options); // Transformation that finds subgraphs whose nodes are marked with // 'groupAttribute', splits those subgraphs into functions, and replaces -- Gitee