diff --git a/convert_tf2npu/ast_impl.py b/convert_tf2npu/ast_impl.py index d1eb1b9322d53ddeb9c3eb15704a1f267ed99a08..bd59bd504fa9a82d589957d29acadfa75be26676 100644 --- a/convert_tf2npu/ast_impl.py +++ b/convert_tf2npu/ast_impl.py @@ -13,8 +13,9 @@ # limitations under the License. # ============================================================================ import ast -import util_global import copy +import pasta +import util_global from util import log_success_report from util import log_migration_report from util import log_msg @@ -31,7 +32,8 @@ def import_from(node): if "keras" in values: util_global.set_value('is_keras_net', True) if "horovod" in values: - util_global.set_value('is_hvd_net', True) + util_global.set_value('has_hccl_api', True) + util_global.set_value('need_conver', True) def ast_import(node): @@ -41,7 +43,8 @@ def ast_import(node): if "keras" in values: util_global.set_value('is_keras_net', True) if "horovod" in values: - util_global.set_value('is_hvd_net', True) + util_global.set_value('has_hccl_api', True) + util_global.set_value('need_conver', True) def ast_function_def(node): log_success_report(getattr(node, "lineno", "None"), node.name) @@ -60,6 +63,8 @@ def ast_if(node): if isinstance(node.test, ast.Compare): if len(node.test.comparators) == 1 and isinstance(node.test.comparators[0], ast.Str): if node.test.comparators[0].s == "__main__": + util_global.set_value("is_main_file", False) + util_global.set_value("has_main_func", True) if util_global.get_value("is_keras_net", False): log_msg(getattr(node, "lineno", "None"), " add keras session npu config") close_sess_call = ast.Call(func=ast.Name(id="close_session", ctx=ast.Load()), @@ -67,11 +72,9 @@ def ast_if(node): keras_sess_assign = ast.Assign(targets=[ast.Name(id="npu_keras_sess", ctx=ast.Store())], value=ast.Call(func=ast.Name(id="set_keras_session_npu_config", ctx=ast.Load()), args=[], keywords=[])) - try_node = ast.Try(body=[keras_sess_assign, node.body], handlers=[], orelse=[], - finalbody=[ast.Expr(value=close_sess_call)]) - node.body = [try_node] + node.body = [keras_sess_assign] + node.body + [ast.Expr(value=close_sess_call)] util_global.set_value('need_conver', True) - if util_global.get_value("is_hvd_net", False): + if util_global.get_value("has_hccl_api", False): log_msg(getattr(node, "lineno", "None"), " add npu resource init api") close_sess_call = ast.Call(func=ast.Name(id="close_session", ctx=ast.Load()), args=[ast.Name(id="npu_sess", ctx=ast.Load())], keywords=[]) @@ -82,9 +85,7 @@ def ast_if(node): shutdown_call = ast.Call(func=ast.Name(id="shutdown_resource", ctx=ast.Load()), args=[ast.Name(id="npu_sess", ctx=ast.Load()), ast.Name(id="npu_shutdown", ctx=ast.Load())], keywords=[]) - try_node = ast.Try(body=[init_assign, node.body], handlers=[], orelse=[], - finalbody=[ast.Expr(value=shutdown_call), ast.Expr(value=close_sess_call)]) - node.body = [try_node] + node.body = [init_assign] + node.body + [ast.Expr(value=shutdown_call), ast.Expr(value=close_sess_call)] util_global.set_value('need_conver', True) return node @@ -159,7 +160,6 @@ def ast_call(node): node.args = [] node.keywords = [] util_global.set_value('need_conver', True) - util_global.set_value('insert_empty_hook', True) return node if isinstance(node.func, ast.Attribute) and node.func.attr == "DistributedOptimizer": log_success_report(getattr(node, "lineno", "None"), 'DistributedOptimizer') @@ -168,6 +168,7 @@ def ast_call(node): log_success_report(getattr(node, "lineno", "None"), 'shard') node.args = [ast.Call(func=ast.Name(id='get_rank_size', ctx=ast.Load()), args=[], keywords=[]), ast.Call(func=ast.Name(id='get_rank_id', ctx=ast.Load()), args=[], keywords=[])] + util_global.set_value("has_hccl_api", True) util_global.set_value('need_conver', True) if isinstance(node.func, ast.Attribute) and node.func.attr == 'dropout': if isinstance(node.func.value, ast.Attribute) and node.func.value.attr == 'nn': @@ -188,11 +189,11 @@ def ast_call(node): if ((isinstance(keyword.value, ast.NameConstant) and keyword.value.value != True) or (not isinstance(keyword.value, ast.NameConstant))): log_success_report(getattr(node, "lineno", "None"), node.func.attr) - keyword.value = ast.NameConstant(value=True) + keyword.value = pasta.parse('True') util_global.set_value('need_conver', True) if not exist: log_success_report(getattr(node, "lineno", "None"), node.func.attr) - keyword = ast.keyword(arg='drop_remainder', value=ast.NameConstant(value=True)) + keyword = ast.keyword(arg='drop_remainder', value=pasta.parse('True')) node.keywords.insert(0, keyword) util_global.set_value('need_conver', True) if (isinstance(node.func, ast.Attribute) and isinstance(node.func.value, ast.Name) and @@ -203,8 +204,10 @@ def ast_call(node): if isinstance(node.func, ast.Attribute) and (node.func.attr == "get_distribution_strategy" or node.func.attr == "MirroredStrategy" or node.func.attr == "MultiWorkerMirroredStrategy"): log_success_report(getattr(node, "lineno", "None"), node.func.attr) - node.func = ast.Attribute(value=ast.Name(id="npu_strategy", ctx=ast.Load()), + new_func = ast.Attribute(value=ast.Name(id="npu_strategy", ctx=ast.Load()), attr="NPUStrategy", ctx=ast.Load()) + ast.copy_location(new_func, node.func) + node.func = new_func node.keywords = [] node.args = [] util_global.set_value('need_conver', True) @@ -278,7 +281,7 @@ def ast_call(node): if (keyword.arg == 'eval_on_tpu') or (keyword.arg == 'use_tpu') or (keyword.arg == 'export_to_tpu'): if (not isinstance(keyword.value, ast.NameConstant)) or (isinstance(keyword.value, ast.NameConstant) and (keyword.value.value != False)): log_success_report(getattr(node, 'lineno', 'None'), 'TPUEstimator(' + keyword.arg + '=*)') - keyword.value = ast.NameConstant(value=False) + keyword.value = pasta.parse('False') util_global.set_value('need_conver', True) if add_eval_on_tpu and (keyword.arg == 'eval_on_tpu'): add_eval_on_tpu = False @@ -288,15 +291,15 @@ def ast_call(node): add_export_to_tpu = False if add_eval_on_tpu: log_success_report(getattr(node, 'lineno', 'None'), 'TPUEstimator(eval_on_tpu=*)') - node.keywords.append(ast.keyword(arg='eval_on_tpu', value=ast.NameConstant(value=False))) + node.keywords.append(ast.keyword(arg='eval_on_tpu', value=pasta.parse('False'))) util_global.set_value('need_conver', True) if add_use_tpu: log_success_report(getattr(node, 'lineno', 'None'), 'TPUEstimator(use_tpu=*)') - node.keywords.append(ast.keyword(arg='use_tpu', value=ast.NameConstant(value=False))) + node.keywords.append(ast.keyword(arg='use_tpu', value=pasta.parse('False'))) util_global.set_value('need_conver', True) if add_export_to_tpu: log_success_report(getattr(node, 'lineno', 'None'), 'TPUEstimator(export_to_tpu=*)') - node.keywords.append(ast.keyword(arg='export_to_tpu', value=ast.NameConstant(value=False))) + node.keywords.append(ast.keyword(arg='export_to_tpu', value=pasta.parse('False'))) util_global.set_value('need_conver', True) if isinstance(node.func, ast.Attribute) and (node.func.attr == 'VirtualDeviceConfiguration'): log_success_report(getattr(node, 'lineno', 'None'), 'VirtualDeviceConfiguration') @@ -338,9 +341,9 @@ def ast_call(node): compile_ops = keyword break if compile_ops: - compile_ops.value = ast.NameConstant(value=False) + compile_ops.value = pasta.parse('False') else: - node.keywords.append(ast.keyword(arg='compile_ops', value=ast.NameConstant(value=False))) + node.keywords.append(ast.keyword(arg='compile_ops', value=pasta.parse('False'))) return node for estimator in util_global.get_value('Estimators', []): if (isinstance(node.func, ast.Attribute) and (node.func.attr == estimator)) \ @@ -352,20 +355,14 @@ def ast_call(node): config = keyword break if config: - config.value = ast.Call( - func=ast.Name(id='npu_run_config_init', ctx=ast.Load()), - args=[], - keywords=[ - ast.keyword(arg='run_config', value=config.value) - ] - ) + new_value = ast.Call(func=ast.Name(id='npu_run_config_init', ctx=ast.Load()), + args=[], + keywords=[ast.keyword(arg='run_config', value=config.value)]) + ast.copy_location(new_value, config.value) + config.value = new_value else: - node.keywords.append( - ast.keyword( - arg='config', - value=ast.Call(func=ast.Name(id='npu_run_config_init', ctx=ast.Load()), args=[], keywords=[]) - ) - ) + node.keywords.append(ast.keyword(arg='config', + value=pasta.parse('npu_run_config_init()'))) util_global.set_value('need_conver', True) return node for estimator_func in util_global.get_value('EstimatorFunc', []): @@ -385,53 +382,51 @@ def ast_call(node): if not input_fn: break if not hooks: - node.keywords.append( - ast.keyword(arg='hooks', value=ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[]))) + node.keywords.append(ast.keyword(arg='hooks', value=pasta.parse('npu_hooks_append()'))) elif isinstance(hooks, ast.keyword): - hooks.value = ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[ - ast.keyword(arg='hooks_list', value=hooks.value)]) + new_value = ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], + keywords=[ast.keyword(arg='hooks_list', value=hooks.value)]) + ast.copy_location(new_value, hooks.value) + hooks.value = new_value else: - node.keywords.append( - ast.keyword(arg='hooks', value=ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[ast.keyword(arg='hooks_list', value=hooks)]))) + node.keywords.append(ast.keyword(arg='hooks', + value=ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), + args=[], keywords=[ast.keyword(arg='hooks_list', value=hooks)]))) util_global.set_value('need_conver', True) return node if isinstance(node.func, ast.Attribute) and (node.func.attr == 'compile'): - opt_map = {"adadelta": "tf.keras.optimizers.Adadelta", - "adagrad": "tf.keras.optimizers.Adagrad", - "adam": "tf.keras.optimizers.Adam", - "adamax": "tf.keras.optimizers.Adamax", - "ftrl": "tf.keras.optimizers.Ftrl", - "nadam": "tf.keras.optimizers.Nadam", - "rmsprop": "tf.keras.optimizers.RMSprop", - "sgd": "tf.keras.optimizers.SGD"} + opt_map = {"adadelta": "tf.keras.optimizers.Adadelta()", + "adagrad": "tf.keras.optimizers.Adagrad()", + "adam": "tf.keras.optimizers.Adam()", + "adamax": "tf.keras.optimizers.Adamax()", + "ftrl": "tf.keras.optimizers.Ftrl()", + "nadam": "tf.keras.optimizers.Nadam()", + "rmsprop": "tf.keras.optimizers.RMSprop()", + "sgd": "tf.keras.optimizers.SGD()"} for keyword in node.keywords: if keyword.arg == "optimizer": log_success_report(getattr(node, 'lineno', 'None'), 'KerasDistributeOptimizer') - opt_func_name = ast.Name(id="npu_keras_optimizer", ctx=ast.Load()) if isinstance(keyword.value, ast.Str): - keras_opt = opt_map[keyword.value.s].split(".") - tf_opt_func = ast.Attribute(value=ast.Attribute(value=ast.Attribute(value=ast.Name(id=keras_opt[0], ctx=ast.Load()), - attr=keras_opt[1], ctx=ast.Load()), attr=keras_opt[2], ctx=ast.Load()), - attr=keras_opt[3], ctx=ast.Load()) - keyword.value = ast.Call(func=opt_func_name, args=[ast.Call(func=tf_opt_func, args=[], keywords=[])], keywords=[]) + keras_opt = opt_map[keyword.value.s] + npu_keras_opt = "npu_keras_optimizer(" + keras_opt + ")" + keyword.value = pasta.parse(npu_keras_opt) util_global.set_value('need_conver', True) - util_global.set_value('insert_npu_keras_opt_func', True) return node if isinstance(node.func, ast.Attribute) and isinstance(node.func.value, ast.Attribute): if (node.func.attr.find("Optimizer") != -1) and (node.func.attr != 'ScipyOptimizerInterface'): log_msg(getattr(node, "lineno", "None"), "add NPUDistributedOptimizer()") - node = ast.Call(func=ast.Name(id="npu_tf_optimizer", ctx=ast.Load()), args=[node], keywords=[]) + new_node = ast.Call(func=ast.Name(id="npu_tf_optimizer", ctx=ast.Load()), args=[node], keywords=[]) + ast.copy_location(new_node, node) util_global.set_value('need_conver', True) - util_global.set_value('insert_npu_tf_opt_func', True) - return node + return new_node if isinstance(node.func, ast.Attribute): opt_list = ["Adadelta", "Adagrad", "Adam", "Adamax", "Ftrl", "Nadam", "RMSprop", "SGD"] if node.func.attr in opt_list: log_success_report(getattr(node, "lineno", "None"), "KerasDistributeOptimizer") - node = ast.Call(func=ast.Name(id="npu_keras_optimizer", ctx=ast.Load()), args=[node], keywords=[]) + new_node = ast.Call(func=ast.Name(id="npu_keras_optimizer", ctx=ast.Load()), args=[node], keywords=[]) + ast.copy_location(new_node, node) util_global.set_value('need_conver', True) - util_global.set_value('insert_npu_keras_opt_func', True) - return node + return new_node if (isinstance(node.func, ast.Attribute) and (node.func.attr == 'MonitoredTrainingSession')) or \ (isinstance(node.func, ast.Name) and (node.func.id == 'MonitoredTrainingSession')): log_success_report(getattr(node, "lineno", "None"), 'MonitoredTrainingSession') @@ -445,14 +440,15 @@ def ast_call(node): hooks = keyword break if not hooks: - node.keywords.append( - ast.keyword(arg='hooks', value=ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[]))) + node.keywords.append(ast.keyword(arg='hooks', value=pasta.parse('npu_hooks_append()'))) elif isinstance(hooks, ast.keyword): - hooks.value = ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[ - ast.keyword(arg='hooks_list', value=hooks.value)]) + new_value = ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], + keywords=[ast.keyword(arg='hooks_list', value=hooks.value)]) + ast.copy_location(new_value, hooks.value) + hooks.value = new_value else: - node.keywords.append( - ast.keyword(arg='hooks', value=ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[ast.keyword(arg='hooks_list', value=hooks)]))) + node.keywords.append(ast.keyword(arg='hooks', value=ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), + args=[], keywords=[ast.keyword(arg='hooks_list', value=hooks)]))) util_global.set_value('need_conver', True) return node specs = {'TrainSpec': 2, 'EvalSpec': 3} @@ -469,14 +465,15 @@ def ast_call(node): hooks = keyword break if not hooks: - node.keywords.append( - ast.keyword(arg='hooks', value=ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[]))) + node.keywords.append(ast.keyword(arg='hooks', value=pasta.parse('npu_hooks_append()'))) elif isinstance(hooks, ast.keyword): - hooks.value = ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[ + new_value = ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[ ast.keyword(arg='hooks_list', value=hooks.value)]) + ast.copy_location(new_value, hooks.value) + hooks.value = new_value else: - node.keywords.append( - ast.keyword(arg='hooks', value=ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), args=[], keywords=[ast.keyword(arg='hooks_list', value=hooks)]))) + node.keywords.append(ast.keyword(arg='hooks', value=ast.Call(func=ast.Name(id='npu_hooks_append', ctx=ast.Load()), + args=[], keywords=[ast.keyword(arg='hooks_list', value=hooks)]))) util_global.set_value('need_conver', True) return node @@ -508,42 +505,7 @@ def insert_npu_import(r_node): r_node.body.insert(import_index, npu_import) log_msg(import_index, "from npu_bridge.npu_init import *") -def insert_npu_tf_opt_func(r_node): - n = 0 - lenline = len(r_node.body) - - while n < lenline and not isinstance(r_node.body[n], ast.ImportFrom) and not isinstance(r_node.body[n], ast.Import): - n += 1 - - while n < lenline and (isinstance(r_node.body[n], ast.ImportFrom) or isinstance(r_node.body[n], ast.Import)): - n += 1 - - if n < lenline: - npu_func = ast.Name(id="NPUDistributedOptimizer", ctx=ast.Load()) - assign_target = ast.Name(id="npu_opt", ctx=ast.Store()) - assign_args = ast.Name(id="opt", ctx=ast.Load()) - npu_opt = ast.Assign(targets=[assign_target], value=ast.Call(func=npu_func, args=[assign_args], keywords=[])) - return_node = ast.Return(value=ast.Name(id='npu_opt', ctx=ast.Load())) - - r_node.body.insert(n, ast.FunctionDef( - name='npu_tf_optimizer', - args=ast.arguments( - args=[ - ast.arg(arg='opt', annotation=None) - ], - vararg=None, - kwonlyargs=[], - kw_defaults=[], - kwarg=None, - defaults=[]), - body=[ - npu_opt, - return_node - ], - decorator_list=[], - returns=None)) - -def insert_npu_keras_opt_func(r_node): +def insert_npu_resource_init(r_node): n = 0 lenline = len(r_node.body) @@ -554,31 +516,22 @@ def insert_npu_keras_opt_func(r_node): n += 1 if n < lenline: - npu_func = ast.Name(id="KerasDistributeOptimizer", ctx=ast.Load()) - assign_target = ast.Name(id="npu_opt", ctx=ast.Store()) - assign_args = ast.Name(id="opt", ctx=ast.Load()) - npu_opt = ast.Assign(targets=[assign_target], value=ast.Call(func=npu_func, args=[assign_args], keywords=[])) - return_node = ast.Return(value=ast.Name(id='npu_opt', ctx=ast.Load())) + init_assign = ast.Assign(targets=[ast.Tuple(elts=[ast.Name(id="npu_sess", ctx=ast.Store()), + ast.Name(id="npu_shutdown", ctx=ast.Store())], + ctx=ast.Store())], + value=ast.Call(func=ast.Name(id="init_resource", ctx=ast.Load()), args=[], keywords=[])) + r_node.body.insert(n, init_assign) - r_node.body.insert(n, ast.FunctionDef( - name='npu_keras_optimizer', - args=ast.arguments( - args=[ - ast.arg(arg='opt', annotation=None) - ], - vararg=None, - kwonlyargs=[], - kw_defaults=[], - kwarg=None, - defaults=[]), - body=[ - npu_opt, - return_node - ], - decorator_list=[], - returns=None)) +def insert_npu_resource_shutdown(r_node): + shutdown_call = ast.Expr(value=ast.Call(func=ast.Name(id="shutdown_resource", ctx=ast.Load()), + args=[ast.Name(id="npu_sess", ctx=ast.Load()), ast.Name(id="npu_shutdown", ctx=ast.Load())], + keywords=[])) + close_sess_call = ast.Expr(value=ast.Call(func=ast.Name(id="close_session", ctx=ast.Load()), + args=[ast.Name(id="npu_sess", ctx=ast.Load())], keywords=[])) + r_node.body.append(shutdown_call) + r_node.body.append(close_sess_call) -def insert_empty_hook(r_node): +def insert_keras_sess_npu_config(r_node): n = 0 lenline = len(r_node.body) @@ -589,26 +542,15 @@ def insert_empty_hook(r_node): n += 1 if n < lenline: - hook_attr = ast.Attribute(value=ast.Attribute(value=ast.Name(id="tf", ctx=ast.Load()), attr="train", ctx=ast.Load()), - attr="SessionRunHook", ctx=ast.Load()) - class_def = ast.ClassDef(name="NpuEmptyHook", bases=[hook_attr], keywords=[], - body=[ast.Pass()], decorator_list=[]) - r_node.body.insert(n, class_def) + keras_sess_assign = ast.Assign(targets=[ast.Name(id="npu_keras_sess", ctx=ast.Store())], + value=ast.Call(func=ast.Name(id="set_keras_session_npu_config", ctx=ast.Load()), + args=[], keywords=[])) + r_node.body.insert(n, keras_sess_assign) -def ast_assign(node): - for target in node.targets: - if (isinstance(target, ast.Name) and target.id == 'global_jit_level') or (isinstance(target, ast.Attribute) and target.attr == 'global_jit_level'): - log_msg(getattr(node, 'lineno', 'None'), 'set global_jit_level=config_pb2.OptimizerOptions.OFF') - util_global.set_value('need_conver', True) - global_jit_level_assign_node = ast.Assign( - targets=node.targets, - ctx=ast.Load(), - value=ast.Attribute(attr='OFF', ctx=ast.Load(), - value=ast.Attribute(attr='OptimizerOptions', ctx=ast.Load(), - value=ast.Name(id='config_pb2', ctx=ast.Load())))) - node = ast.If(test=ast.NameConstant(value=True), body=[global_jit_level_assign_node], orelse=[]) - return node - return node +def insert_keras_sess_close(r_node): + close_sess_call = ast.Expr(value=ast.Call(func=ast.Name(id="close_session", ctx=ast.Load()), + args=[ast.Name(id="npu_keras_sess", ctx=ast.Load())], keywords=[])) + r_node.body.append(close_sess_call) # Format printing for locate def node_tree(node:str): diff --git a/convert_tf2npu/conver.py b/convert_tf2npu/conver.py index 957c9adcbbbf48e01ed519aa0e77961b28b8ce4d..fb00b69ed4a7d5726ba9df44d27330696b54b13e 100644 --- a/convert_tf2npu/conver.py +++ b/convert_tf2npu/conver.py @@ -36,7 +36,7 @@ def conver(): for path, dir_list, file_list in conver_path: for file_name in file_list: - out_path_dst = abs_join(dst_path_new, path.split(dst_path)[1]) + out_path_dst = abs_join(dst_path_new, path.split(util_global.get_value('input'))[1]) file_path = os.path.join(path, file_name).replace('\\', '/') content = "Begin conver file: " + file_path print(content) diff --git a/convert_tf2npu/conver_by_ast.py b/convert_tf2npu/conver_by_ast.py index 0bdf175dc3f8189f6f47b64174c8775f78f70826..3788b6cc3f364cce411673eb6ac9872af3a2963a 100644 --- a/convert_tf2npu/conver_by_ast.py +++ b/convert_tf2npu/conver_by_ast.py @@ -15,25 +15,13 @@ import os import sys import ast -import astunparse +import pasta import util_global from file_op import write_output_after_conver from file_op import write_report_after_conver from file_op import scan_file -from util import log_success_report -from util import log_migration_report -from ast_impl import attribute -from ast_impl import node_tree -from ast_impl import insert_npu_import -from ast_impl import insert_npu_tf_opt_func -from ast_impl import insert_npu_keras_opt_func -from ast_impl import insert_empty_hook -from ast_impl import import_from -from ast_impl import ast_import -from ast_impl import ast_function_def -from ast_impl import ast_call -from ast_impl import ast_assign -from ast_impl import ast_if +from util import * +from ast_impl import * from visit_by_ast import get_tf_api class ConverByAst(ast.NodeTransformer): @@ -75,11 +63,6 @@ class ConverByAst(ast.NodeTransformer): return node def visit_Assign(self, node): - for target in node.targets: - if (isinstance(target, ast.Name) and target.id == 'global_jit_level') or (isinstance(target, ast.Attribute) and target.attr == 'global_jit_level'): - return ast_assign(node) - - ast_assign(node) self.generic_visit(node) return node @@ -90,16 +73,16 @@ class ConverByAst(ast.NodeTransformer): def conver_ast(path, out_path_dst, file_name): util_global.set_value('need_conver', False) - util_global.set_value('insert_estimator_add_hook_func', False) - util_global.set_value('insert_npu_tf_opt_func', False) - util_global.set_value('insert_npu_keras_opt_func', False) - util_global.set_value('insert_empty_hook', False) util_global.set_value('is_keras_net', False) - util_global.set_value('is_hvd_net', False) + util_global.set_value('has_hccl_api', False) + util_global.set_value('is_main_file', False) + util_global.set_value('has_main_func', False) + if os.path.join(path, file_name) == util_global.get_value('main', ""): + util_global.set_value('is_main_file', True) with open(os.path.join(path, file_name), "r", encoding='utf-8') as file: source = file.read() try: - r_node = ast.parse(source) + r_node = pasta.parse(source) except Exception as e: print(repr(e)) return @@ -116,13 +99,17 @@ def conver_ast(path, out_path_dst, file_name): if util_global.get_value('need_conver', False): insert_npu_import(r_node) - if util_global.get_value('insert_npu_tf_opt_func', False): - insert_npu_tf_opt_func(r_node) - if util_global.get_value('insert_npu_keras_opt_func', False): - insert_npu_keras_opt_func(r_node) - if util_global.get_value('insert_empty_hook', False): - insert_empty_hook(r_node) - dst_content = astunparse.unparse(r_node) + if not util_global.get_value('has_main_func', False) and (util_global.get_value('has_hccl_api', False) + or util_global.get_value('is_keras_net', False)): + log_warning('the network of keras and horovod, or using dataset.shard script do not have main func, ' + 'should set -m or --main parameter') + if util_global.get_value('is_main_file', False) and util_global.get_value('has_hccl_api', False): + insert_npu_resource_init(r_node) + insert_npu_resource_shutdown(r_node) + if util_global.get_value('is_main_file', False) and util_global.get_value('is_keras_net', False): + insert_keras_sess_npu_config(r_node) + insert_keras_sess_close(r_node) + dst_content = pasta.dump(r_node) write_output_after_conver(os.path.join(util_global.get_value('output'), out_path_dst, file_name), dst_content) if file_name.endswith("a.py"): diff --git a/convert_tf2npu/file_op.py b/convert_tf2npu/file_op.py index 41db221134b10ea23b690b9cf9ee138741e09398..fc4264d30a193b6cfb424c163df811e4ac71450b 100644 --- a/convert_tf2npu/file_op.py +++ b/convert_tf2npu/file_op.py @@ -17,6 +17,7 @@ import shutil import util_global import pandas as pd from visit_by_ast import get_tf_enume +from visit_by_ast import get_unsupport_api def before_clear(): exit_folder = os.path.exists(util_global.get_value('output')) @@ -131,6 +132,18 @@ def scan_file(path, file_name, api, lineno): support_type.append(api_support[api_name.index(class_name)]) migrate_advice.append(api_advice[api_name.index(class_name)]) + # record unsupported api + (unsupport, lineno) = get_unsupport_api(os.path.join(path, file_name)) + for i in range(len(unsupport)): + name = unsupport[i] + module = name.split('.')[0] + script_name.append(file_name) + code_api.append(name) + code_line.append(lineno[i]) + code_module.append(module) + support_type.append('不支持(无迁移方案,建议用户不使用)') + migrate_advice.append('第三方非TF官网API,暂不支持') + analyse_result = pd.DataFrame({'脚本文件名': script_name, '代码行': code_line, '模块名': code_module, 'API名': code_api, '工具迁移API支持度': support_type, '说明': migrate_advice}) diff --git a/convert_tf2npu/main.py b/convert_tf2npu/main.py index e890640e916facf3163c78f9651f717f3d603168..0afe005e13c7fb1abdce52125d44b6308c86e586 100644 --- a/convert_tf2npu/main.py +++ b/convert_tf2npu/main.py @@ -20,40 +20,45 @@ from file_op import before_clear from conver import conver def para_check_and_set(argv): - input = "input" - list = "tf1.15_api_support_list.xlsx" + input_dir = "npu_input" + support_list = "tf1.15_api_support_list.xlsx" output = "output" + util_global.get_value('timestap') report = "report" + util_global.get_value('timestap') report_suffix = report + main_file = "" try: - opts, args = getopt.getopt(argv, "hi:l:o:r:", ["help", "input=", "list=", "output=", "report="]) + opts, args = getopt.getopt(argv, "hi:l:o:r:m:", ["help", "input=", "list=", "output=", "report=", "main="]) except getopt.GetoptError: print('Parameter error, please check.') - print(' main.py -i -l -o -r ') - print('or: main.py --input= --list= --output= --report=') - print('-i or --input: The source script to be converted, Default value: input/') + print(' this tool just support to convert tf-1.15 scripts.') + print(' main.py -i -l -o -r -m
') + print('or: main.py --input= --list= --output= --report= --main=
') + print('-i or --input: The source script to be converted.') print('-l or --list: The list of supported api, Default value: tf1.15_api_support_list.xlsx') - print('-o or --output: The destination script after converted, Default value: output/') - print('-r or --report: Conversion report, Default value: report/') + print('-o or --output: The destination script after converted, Default value: output_npu_***/') + print('-r or --report: Conversion report, Default value: report_npu_***/') + print('-m or --main: the executed entry *.py file, default:None') sys.exit(2) for opt, arg in opts: if opt in ("-h", "--help"): - print(' main.py -i -l -o -r ') - print('or: main.py --input= --list= --output= --report=') - print('-i or --input: The source script to be converted, Default value: input/') + print(' this tool just support to convert tf-1.15 scripts.') + print(' main.py -i -l -o -r -m
') + print('or: main.py --input= --list= --output= --report= --main=
') + print('-i or --input: The source script to be converted') print('-l or --list: The list of supported api, Default value: tf1.15_api_support_list.xlsx') - print('-o or --output: The destination script after converted, Default value: output/') - print('-r or --report: Conversion report, Default value: report/') + print('-o or --output: The destination script after converted, Default value: output_npu_***/') + print('-r or --report: Conversion report, Default value: report_npu_***/') + print('-m or --main: the executed entry *.py file, default:None') sys.exit() elif opt in ("-i", "--input"): - input = os.path.abspath(arg) - if str(input).endswith('/'): - input = input[0:len(input)-1] - input = input.replace('\\', '/') + input_dir = os.path.abspath(arg) + if str(input_dir).endswith('/'): + input_dir = input_dir[0:len(input_dir)-1] + input_dir = input_dir.replace('\\', '/') elif opt in ("-l", "--list"): - list = arg + support_list = arg elif opt in ("-o", "--output"): output = os.path.abspath(arg) if str(output).endswith('/'): @@ -65,15 +70,30 @@ def para_check_and_set(argv): report = report[0:len(report)-1] report = os.path.join(report, report_suffix) report = report.replace('\\', '/') + elif opt in ("-m", "--main"): + if os.path.isfile(arg): + main_file = os.path.abspath(arg) + main_path = os.path.dirname(main_file) + file = os.path.basename(main_file) + main_path = main_path.replace('\\', '/') + main_file = os.path.join(main_path, file) + else: + raise ValueError("--main args must be exited files") - if input+'/' in output+'/' or input+'/' in report+'/': + if input_dir == "npu_input": + raise ValueError("Please check -i or --input.") + + + if input_dir + '/' in output + '/' or input_dir + '/' in report + '/': print(" or could not be the subdirectory of , please try another option.") sys.exit(2) - util_global.set_value('input', input) - util_global.set_value('list', list) + util_global.set_value('input', input_dir) + util_global.set_value('list', support_list) util_global.set_value('output', output) util_global.set_value('report', report) + util_global.set_value('main', main_file) + if __name__ == "__main__": util_global._init() diff --git a/convert_tf2npu/main_win.py b/convert_tf2npu/main_win.py index c553ae4f0f9e260f84ebe5174ca2697076da2a72..de619d4d4192b1595661267e133f95b6de4398bc 100644 --- a/convert_tf2npu/main_win.py +++ b/convert_tf2npu/main_win.py @@ -29,20 +29,20 @@ class Analyse(object): tk.Entry(self.root, textvariable=self.script_path, width=30).grid(row=0, column=1, padx=10, pady=10) tk.Button(self.root, text="路径选择", command=self.select_script_path).grid(row=0, column=2) - self.api_list = tk.StringVar() - tk.Label(self.root, text="API支持度清单:").grid(row=1, stick=tk.E) - tk.Entry(self.root, textvariable=self.api_list, width=30).grid(row=1, column=1, padx=10, pady=10) - tk.Button(self.root, text="文件选择", command=self.select_api_file).grid(row=1, column=2) - self.output_path = tk.StringVar() - tk.Label(self.root, text="输出迁移脚本路径:").grid(row=2, stick=tk.W) - tk.Entry(self.root, textvariable=self.output_path, width=30).grid(row=2, column=1, padx=10, pady=10) - tk.Button(self.root, text="路径选择", command=self.select_output_path).grid(row=2, column=2) + tk.Label(self.root, text="输出迁移脚本路径:").grid(row=1, stick=tk.W) + tk.Entry(self.root, textvariable=self.output_path, width=30).grid(row=1, column=1, padx=10, pady=10) + tk.Button(self.root, text="路径选择", command=self.select_output_path).grid(row=1, column=2) self.report_path = tk.StringVar() - tk.Label(self.root, text="输出分析报告路径:").grid(row=3, stick=tk.W) - tk.Entry(self.root, textvariable=self.report_path, width=30).grid(row=3, column=1, padx=10, pady=10) - tk.Button(self.root, text="路径选择", command=self.select_report_path).grid(row=3, column=2) + tk.Label(self.root, text="输出分析报告路径:").grid(row=2, stick=tk.W) + tk.Entry(self.root, textvariable=self.report_path, width=30).grid(row=2, column=1, padx=10, pady=10) + tk.Button(self.root, text="路径选择", command=self.select_report_path).grid(row=2, column=2) + + self.main_file = tk.StringVar() + tk.Label(self.root, text="执行入口脚本:").grid(row=3, stick=tk.E) + tk.Entry(self.root, textvariable=self.main_file, width=30).grid(row=3, column=1, padx=10, pady=10) + tk.Button(self.root, text="文件选择", command=self.select_main_file).grid(row=3, column=2) tk.Button(self.root, text="开始分析", command=self.analyse).grid(row=5, column=2, padx=10, pady=10) tk.Button(self.root, text="退出", command=exit).grid(row=5, column=1, padx=10, pady=10, stick=tk.E) @@ -70,36 +70,58 @@ class Analyse(object): path_ = askdirectory() self.output_path.set(path_) - def select_api_file(self): - file_ = askopenfilename() - self.api_list.set(file_) + def select_main_file(self): + main_file_ = askopenfilename() + self.main_file.set(main_file_) def analyse(self): # verify input arguments if self.script_path.get() == '': print('Parameter error, please select the folder of source script to be converted.') return - if self.api_list.get() == '': - print('Parameter error, please select the list of supported api.') - return # generate command - if self.output_path.get() == '' and self.report_path.get() == '': - call_main_py = 'python main.py -i ' + self.script_path.get() + \ - ' -l ' + self.api_list.get() - elif self.output_path.get() == '': - call_main_py = 'python main.py -i ' + self.script_path.get() + \ - ' -l ' + self.api_list.get() + \ - ' -r ' + self.report_path.get() - elif self.report_path.get() == '': - call_main_py = 'python main.py -i ' + self.script_path.get() + \ - ' -l ' + self.api_list.get() + \ - ' -o ' + self.output_path.get() + support_list = "tf1.15_api_support_list.xlsx" + + if self.main_file.get() == '': + if self.output_path.get() == '' and self.report_path.get() == '': + call_main_py = 'python main.py -i ' + self.script_path.get() + \ + ' -l ' + support_list + elif self.output_path.get() == '': + call_main_py = 'python main.py -i ' + self.script_path.get() + \ + ' -l ' + support_list + \ + ' -r ' + self.report_path.get() + elif self.report_path.get() == '': + call_main_py = 'python main.py -i ' + self.script_path.get() + \ + ' -l ' + support_list + \ + ' -o ' + self.output_path.get() + else: + call_main_py = 'python main.py -i ' + self.script_path.get() + \ + ' -l ' + support_list + \ + ' -o ' + self.output_path.get() + \ + ' -r ' + self.report_path.get() else: - call_main_py = 'python main.py -i ' + self.script_path.get() + \ - ' -l ' + self.api_list.get() + \ - ' -o ' + self.output_path.get() + \ - ' -r ' + self.report_path.get() + if self.output_path.get() == '' and self.report_path.get() == '': + call_main_py = 'python main.py -i ' + self.script_path.get() + \ + ' -l ' + support_list + \ + ' -m ' + self.main_file.get() + elif self.output_path.get() == '': + call_main_py = 'python main.py -i ' + self.script_path.get() + \ + ' -l ' + support_list + \ + ' -r ' + self.report_path.get() + \ + ' -m ' + self.main_file.get() + elif self.report_path.get() == '': + call_main_py = 'python main.py -i ' + self.script_path.get() + \ + ' -l ' + support_list + \ + ' -o ' + self.output_path.get() + \ + ' -m ' + self.main_file.get() + else: + call_main_py = 'python main.py -i ' + self.script_path.get() + \ + ' -l ' + support_list + \ + ' -o ' + self.output_path.get() + \ + ' -r ' + self.report_path.get() + \ + ' -m ' + self.main_file.get() + os.system(call_main_py) self.hide() @@ -113,14 +135,14 @@ class Analyse(object): if self.report_path.get() == '': self.report_path.set(os.getcwd()) - report_root_dir = self.report_path.get() - report_dir = [] - for item in os.listdir(report_root_dir): + report_dir = self.report_path.get() + lateset = [] + for item in os.listdir(report_dir): if 'report_npu' in item: - report_dir.append(item) - report_dir.sort() + lateset.append(item) + lateset.sort() - report_path = os.path.join(report_root_dir, report_dir[-1], 'api_analysis_report.xlsx') + report_path = os.path.join(report_dir, lateset[-1], 'api_analysis_report.xlsx') if not os.path.exists(report_path): print("No api analysis report generated.") return @@ -163,10 +185,4 @@ if __name__ == '__main__': root = tk.Tk() root.geometry('425x210') app = Analyse(root) - root.mainloop() - - - - - - + root.mainloop() \ No newline at end of file diff --git a/convert_tf2npu/util.py b/convert_tf2npu/util.py index 6bd0029461325c4c9f5f901404c9906e728fbeaf..1034e227a125327583aa4abd29f42950c0f51859 100644 --- a/convert_tf2npu/util.py +++ b/convert_tf2npu/util.py @@ -27,6 +27,11 @@ def log_info(lineno, msg, file): print(content) write_conver_report(content, file) +def log_warning(msg): + content = "************" + msg + "************" + print(content) + write_conver_report(content, util_global.get_value('report_file')[0]) + def log_success_report(lineno, msg): content = (util_global.get_value('path', '') + ':' + str(lineno) + ' change ' + util_global.get_value(msg)[1] + diff --git a/convert_tf2npu/visit_by_ast.py b/convert_tf2npu/visit_by_ast.py index 30eea719e0753cdcd69d70171262a832fdaa2dcd..badcf7021355655bcb770f96a76463cf9994b8e5 100644 --- a/convert_tf2npu/visit_by_ast.py +++ b/convert_tf2npu/visit_by_ast.py @@ -64,6 +64,36 @@ class VisitAttr(ast.NodeVisitor): self._in_attr = False self.generic_visit(node) +class VisitUnsupportImport(ast.NodeVisitor): + def __init__(self): + self.imports = [] + self.modules = [] + self.unsupport = ['cupy'] + + def visit_ImportFrom(self, node): + if node.module != None: + self.modules = node.module.split('.') + for value in node.names: + if isinstance(value, ast.alias): + classes = value.name.split('.') + # from module import unsupported classes + if self.modules[0] in self.unsupport: + self.imports.append(classes[0]) + self.generic_visit(node) + + def visit_Import(self, node): + for value in node.names: + if isinstance(value, ast.alias): + self.modules = value.name.split('.') + if self.modules[0] in self.unsupport: + # import unsupported module as alias: + if value.asname != None: + self.imports.append(value.asname) + # import unsupported module + else: + self.imports.append(self.modules[0]) + self.generic_visit(node) + def get_tf_api(file_name): with open(file_name, 'r', encoding='utf-8') as file: source = file.read() @@ -98,6 +128,20 @@ def get_tf_enume(file_name, enume_list): lineno.append(visitor.linenos[i]) return api, lineno +def get_unsupport_api(file_name): + with open(file_name, 'r', encoding='utf-8') as file: + source = file.read() + tree = ast.parse(source) + visitor = VisitCall() + visitor.visit(tree) + unsupportor = VisitUnsupportImport() + unsupportor.visit(tree) - - + #get unsupport api + api = [] + lineno = [] + for i in range(len(visitor.calls)): + if visitor.calls[i].split('.')[0] in unsupportor.imports: + api.append(visitor.calls[i]) + lineno.append(visitor.linenos[i]) + return api, lineno \ No newline at end of file diff --git a/tf_adapter/kernels/geop_npu.cc b/tf_adapter/kernels/geop_npu.cc index 03e5424fdc2bdd60f6f0a3415a23057621089c18..e403a01b14d067c9b41ba2bb1cf5c088fa3a4280 100644 --- a/tf_adapter/kernels/geop_npu.cc +++ b/tf_adapter/kernels/geop_npu.cc @@ -516,7 +516,7 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { char *need_print = getenv("PRINT_MODEL"); if (need_print != nullptr && strcmp("1", need_print) == 0) { - string tmpmodel_path = "TF_"; + string tmpmodel_path = GetDumpPath() + "TF_"; string tmodel_path = tmpmodel_path + geop_name.c_str() + ".pbtxt"; Status status_out = WriteTextProto(Env::Default(), tmodel_path, ori_graph_def); } @@ -591,7 +591,7 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { char *need_print = getenv("PRINT_MODEL"); if (need_print != nullptr && strcmp("1", need_print) == 0) { - string tmpmodel_path = "TF_Subgraph_"; + string tmpmodel_path = GetDumpPath() + "TF_Subgraph_"; string tmodel_path = tmpmodel_path + graph.c_str() + ".pbtxt"; Status status_out = WriteTextProto(Env::Default(), tmodel_path, *graph_def_out.get()); } diff --git a/tf_adapter/optimizers/add_input_pass.cc b/tf_adapter/optimizers/add_input_pass.cc index 377840f3513776e3b0726973850df953f37e5a60..85daf921094c9d7642661a8aa40305014413c076 100644 --- a/tf_adapter/optimizers/add_input_pass.cc +++ b/tf_adapter/optimizers/add_input_pass.cc @@ -103,7 +103,7 @@ Status AddInputPass::Run(const GraphOptimizationPassOptions &options) { if (need_print != nullptr && strcmp("1", need_print) == 0) { GraphDef ori_graph_def; graph->get()->ToGraphDef(&ori_graph_def); - string ori_model_path = "BeforeSubGraph_Add_Input_"; + string ori_model_path = GetDumpPath() + "BeforeSubGraph_Add_Input_"; string omodel_path = ori_model_path + std::to_string(graph_num) + ".pbtxt"; Status status_out = WriteTextProto(Env::Default(), omodel_path, ori_graph_def); } diff --git a/tf_adapter/optimizers/dp_tf_ge_conversion_pass.cc b/tf_adapter/optimizers/dp_tf_ge_conversion_pass.cc index bc39bf4c108fde7a575043cfec1b1601e4bf825f..a714c38d47045990ea55dad012d294834d79f779 100644 --- a/tf_adapter/optimizers/dp_tf_ge_conversion_pass.cc +++ b/tf_adapter/optimizers/dp_tf_ge_conversion_pass.cc @@ -557,7 +557,7 @@ bool DpTfToGEConversionPassImpl::RunPass(std::unique_ptr *g, FunctionLibr if (nullptr != need_print && strcmp("1", need_print) == 0) { GraphDef before_graphdef; (*g)->ToGraphDef(&before_graphdef); - string pre_model_path = "BeforeSubGraph_dp_"; + string pre_model_path = GetDumpPath() + "BeforeSubGraph_dp_"; string pmodel_path = pre_model_path + std::to_string(graph_run_num_) + ".pbtxt"; TF_DO_CHECK_OK(WriteTextProto(Env::Default(), pmodel_path, before_graphdef), ERROR); } diff --git a/tf_adapter/optimizers/get_attr_optimize_pass.cc b/tf_adapter/optimizers/get_attr_optimize_pass.cc index 37fe9c075dbbe75ef2b9d6d0640d59b0574c74c4..8a639f01e5d3e2bc7fb11b4785336bf2f55aafbf 100644 --- a/tf_adapter/optimizers/get_attr_optimize_pass.cc +++ b/tf_adapter/optimizers/get_attr_optimize_pass.cc @@ -87,7 +87,7 @@ Status GetAttrOptimizePass::Run(const GraphOptimizationPassOptions &options) { if (need_print != nullptr && strcmp("1", need_print) == 0) { GraphDef ori_graph_def; options.graph->get()->ToGraphDef(&ori_graph_def); - string ori_model_path = "BeforeGetAttrOptimize_"; + string ori_model_path = GetDumpPath() + "BeforeGetAttrOptimize_"; string omodel_path = ori_model_path + std::to_string(graph_num) + ".pbtxt"; Status status_out = WriteTextProto(Env::Default(), omodel_path, ori_graph_def); } diff --git a/tf_adapter/optimizers/mark_start_node_pass.cc b/tf_adapter/optimizers/mark_start_node_pass.cc index 5d317a66bc730c81755f59ff8d447476f27fd125..85e7820f94a6118b309f8503cee9a940ddfa729a 100644 --- a/tf_adapter/optimizers/mark_start_node_pass.cc +++ b/tf_adapter/optimizers/mark_start_node_pass.cc @@ -105,7 +105,7 @@ Status MarkStartNodePass::Run(const GraphOptimizationPassOptions &options) { if (need_print != nullptr && strcmp("1", need_print) == 0) { GraphDef ori_graph_def; graph->get()->ToGraphDef(&ori_graph_def); - string ori_model_path = "BeforeMarkStartNodeAttr_"; + string ori_model_path = GetDumpPath() + "BeforeMarkStartNodeAttr_"; string omodel_path = ori_model_path + std::to_string(graph_num) + ".pbtxt"; Status status_out = WriteTextProto(Env::Default(), omodel_path, ori_graph_def); } diff --git a/tf_adapter/optimizers/om_partition_subgraphs_pass.cc b/tf_adapter/optimizers/om_partition_subgraphs_pass.cc index 87f73041d3026012282d565dda7c4fbabc0cb4b7..411a86dcc6a2c627151e86fda3e43e22cd7c404f 100644 --- a/tf_adapter/optimizers/om_partition_subgraphs_pass.cc +++ b/tf_adapter/optimizers/om_partition_subgraphs_pass.cc @@ -1889,7 +1889,7 @@ Status OMPartitionSubgraphsPass::ProcessGraph(std::unique_ptr *graph, Fun if (need_print != nullptr && strcmp("1", need_print) == 0) { GraphDef ori_graph_def; graph->get()->ToGraphDef(&ori_graph_def); - string ori_model_path = "BeforeSubGraph_"; + string ori_model_path = GetDumpPath() + "BeforeSubGraph_"; string omodel_path = ori_model_path + std::to_string(graph_num) + ".pbtxt"; Status status_out = WriteTextProto(Env::Default(), omodel_path, ori_graph_def); } @@ -1910,6 +1910,12 @@ Status OMPartitionSubgraphsPass::ProcessGraph(std::unique_ptr *graph, Fun if (node->type_string() == "IteratorGetNext") { include_getnext = true; + for (auto output_type: node->output_types()) { + if (output_type == DT_STRING && pass_options["enable_dp"] == "0") { + ADP_LOG(WARNING) << "Dataset outputs have string output_type, please set enable_data_pre_proc=True."; + LOG(WARNING) << "Dataset outputs have string output_type, please set enable_data_pre_proc=True."; + } + } if (is_set_dynamic_config) { getnext_node_count++; } } else if (node->type_string() == "_UnaryOpsComposition") { ADP_LOG(INFO) << "begin split _UnaryOpsComposition."; diff --git a/tf_adapter/optimizers/weight_update_sharding_pass.cc b/tf_adapter/optimizers/weight_update_sharding_pass.cc index 86266063567587b559a534eb8ddd6bcf755fc11f..21fa48ca1fb1c5471b83539c0a6931c4f6cdb099 100644 --- a/tf_adapter/optimizers/weight_update_sharding_pass.cc +++ b/tf_adapter/optimizers/weight_update_sharding_pass.cc @@ -88,7 +88,7 @@ Status WeightUpdateShardingPass::Run(const GraphOptimizationPassOptions &options if (need_print != nullptr && strcmp("1", need_print) == 0) { GraphDef ori_graph_def; graphIn->ToGraphDef(&ori_graph_def); - string ori_model_path = "BeforeWeightUpdateSharding_"; + string ori_model_path = GetDumpPath() + "BeforeWeightUpdateSharding_"; string omodel_path = ori_model_path + std::to_string(graph_num) + ".pbtxt"; Status status_out = WriteTextProto(Env::Default(), omodel_path, ori_graph_def); } diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py index 02a5dd576f7196a23c9828792996b696f7ef5999..7601919d09f165a6ab7ca21e7dd85cd785875b16 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py @@ -1,14 +1,17 @@ -import tensorflow as tf +import os from six.moves import queue as Queue +import time import threading + +import tensorflow as tf +from tensorflow.core.protobuf import config_pb2 +from tensorflow.python.ops import summary_ops_v2 as contrib_summary +from tensorflow.python.platform import tf_logging as logging from tensorflow.python.training import session_run_hook from tensorflow.python.training import basic_session_run_hooks -from tensorflow.python.platform import tf_logging as logging + from npu_bridge.estimator import npu_ops from npu_bridge.hccl import hccl_ops -from tensorflow.python.ops import summary_ops_v2 as contrib_summary -from tensorflow.core.protobuf import config_pb2 -import time from npu_bridge.estimator.npu import util # Constant @@ -63,14 +66,20 @@ class NPUBroadcastGlobalVariablesHook(session_run_hook.SessionRunHook): self._root_rank = root_rank self._index = index self._bcast_op = None + rank_size = os.getenv('RANK_SIZE', "1") + if rank_size.isdigit(): + self._rank_size = int(rank_size) + else: + self._rank_size = 1 def begin(self): - if not self._bcast_op or self._bcast_op.graph != tf.get_default_graph(): + if not self._bcast_op or self._bcast_op.graph != tf.get_default_graph() and self._rank_size > 1: self._bcast_op = broadcast_global_variables(self._root_rank, self._index) def after_create_session(self, session, coord): - logging.info("NPUBroadcastGlobalVariablesHook run...") - session.run(self._bcast_op) + if self._rank_size > 1: + logging.info("NPUBroadcastGlobalVariablesHook run...") + session.run(self._bcast_op) class NPUCheckpointSaverHook(basic_session_run_hooks.CheckpointSaverHook): diff --git a/tf_adapter/python/npu_bridge/estimator/npu/util.py b/tf_adapter/python/npu_bridge/estimator/npu/util.py index 270464981acc5fa3b532edb3b42740d1ad8996fe..aac6086778ff8310892b2dcfea61bc3c5cd152aa 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/util.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/util.py @@ -430,14 +430,14 @@ def npu_compile(sess, *fetches): sess.run(fetches) def global_dict_init(): - global _global_dict - _global_dict = {} + global _global_dict + _global_dict = {} def set_value(key, value): - _global_dict[key] = value + _global_dict[key] = value def get_value(key, def_value = None): - try: - return _global_dict[key] - except KeyError: - return def_value \ No newline at end of file + try: + return _global_dict[key] + except KeyError: + return def_value \ No newline at end of file diff --git a/tf_adapter/python/npu_bridge/npu_init.py b/tf_adapter/python/npu_bridge/npu_init.py index c79b0e8348be5d515b0e97535c244e484ab2fc0b..cd1d648cfc869451258dc7d1b4f5aedccfc32a13 100644 --- a/tf_adapter/python/npu_bridge/npu_init.py +++ b/tf_adapter/python/npu_bridge/npu_init.py @@ -28,6 +28,7 @@ from tensorflow.core.protobuf import config_pb2 from tensorflow.core.protobuf import rewriter_config_pb2 from tensorflow.core.protobuf.rewriter_config_pb2 import RewriterConfig from tensorflow.python.client import session +from tensorflow.python.training import session_run_hook from hccl.manage.api import create_group from hccl.manage.api import destroy_group @@ -90,9 +91,13 @@ def npu_run_config_init(run_config=None): run_config.__dict__['_session_config'] = npu_config_proto(run_config.session_config) return run_config -def set_keras_session_npu_config(): +def set_keras_session_npu_config(config=None): from tensorflow.python.keras import backend - config = config_pb2.ConfigProto(allow_soft_placement=True, log_device_placement=False) + if config is None: + config = config_pb2.ConfigProto(allow_soft_placement=True, log_device_placement=False) + else: + if not isinstance(config, config_pb2.ConfigProto): + raise ValueError("config must be config_pb2.ConfigProto type") custom_op = config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" config.graph_options.rewrite_options.remapping = RewriterConfig.OFF @@ -131,4 +136,15 @@ def get_npu_local_rank_id(): return util.get_value("npu_local_rank_id", 0) def get_npu_rank_size(): - return util.get_value("npu_rank_size", 1) \ No newline at end of file + return util.get_value("npu_rank_size", 1) + +class NpuEmptyHook(session_run_hook.SessionRunHook): + pass + +def npu_keras_optimizer(opt): + npu_opt = KerasDistributeOptimizer(opt) + return npu_opt + +def npu_tf_optimizer(opt): + npu_opt = NPUDistributedOptimizer(opt) + return npu_opt \ No newline at end of file diff --git a/tf_adapter/python/npu_bridge/tbe/npu_cube_ops.py b/tf_adapter/python/npu_bridge/tbe/npu_cube_ops.py index d9e877109112222d6337b16ee22927c04ae6f4fd..fcd3fb3f00d6dd6cc50479e204b5b521ae9d267d 100644 --- a/tf_adapter/python/npu_bridge/tbe/npu_cube_ops.py +++ b/tf_adapter/python/npu_bridge/tbe/npu_cube_ops.py @@ -117,7 +117,7 @@ def deformable_conv2d( # pylint: disable=redefined-builtin modulated=True, name=name) - strides_conv = strides + strides_conv = list(strides) pos_h, pos_w = data_format.find('H'), data_format.find('W') strides_conv[pos_h] = kh strides_conv[pos_w] = kw diff --git a/tf_adapter/util/ge_plugin.cc b/tf_adapter/util/ge_plugin.cc index 64874c4e2708f2473a68e7657d2b5c41bc577788..70b4b09f587c3aa4c13cd99518794c2777b6abd4 100644 --- a/tf_adapter/util/ge_plugin.cc +++ b/tf_adapter/util/ge_plugin.cc @@ -212,14 +212,21 @@ void GePlugin::Init(std::map &init_options, bool is_gl ADP_LOG(INFO) << "[GePlugin] mstune mode : " << init_options["ge.jobType"] << ", work path : " << init_options["ge.tuningPath"] << ", distribute_config : " << init_options["distribute_config"]; - - // Open TsdClient first, then call GEInitialize - ADP_LOG(INFO) << "[GePlugin] Open TsdClient and Init tdt host."; - int32_t ret = tdt::TdtOutFeedInit(static_cast(device_id_)); - if (ret != 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << "[GePlugin] Tdt host init failed, tdt error code : " << ret; - LOG(FATAL) << "[GePlugin] Tdt host init failed, tdt error code : " << ret; + + const char *tdt_uninit_env = std::getenv("ASCEND_TDT_UNINIT"); + bool tdt_init = true; + if (tdt_uninit_env != nullptr && std::atoi(tdt_uninit_env) == 1) { + tdt_init = false; + } + if (tdt_init) { + // Open TsdClient first, then call GEInitialize + ADP_LOG(INFO) << "[GePlugin] Open TsdClient and Init tdt host."; + int32_t ret = tdt::TdtOutFeedInit(static_cast(device_id_)); + if (ret != 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ADP_LOG(FATAL) << "[GePlugin] Tdt host init failed, tdt error code : " << ret; + LOG(FATAL) << "[GePlugin] Tdt host init failed, tdt error code : " << ret; + } } // ge Initialize @@ -279,13 +286,21 @@ void GePlugin::Finalize() { // ge finalize GeFinalize(); - ADP_LOG(INFO) << "[GePlugin] Close TsdClient and destroy tdt."; - int32_t ret = tdt::TdtOutFeedDestroy(); - if (ret != 0) { - LOG(ERROR) << "[GePlugin] Close tdt host failed."; - ADP_LOG(ERROR) << "[GePlugin] Close tdt host failed."; + const char *tdt_uninit_env = std::getenv("ASCEND_TDT_UNINIT"); + bool tdt_init = true; + if (tdt_uninit_env != nullptr && std::atoi(tdt_uninit_env) == 1) { + tdt_init = false; + } + if (tdt_init) { + ADP_LOG(INFO) << "[GePlugin] Close TsdClient and destroy tdt."; + int32_t ret = tdt::TdtOutFeedDestroy(); + if (ret != 0) { + LOG(ERROR) << "[GePlugin] Close tdt host failed."; + ADP_LOG(ERROR) << "[GePlugin] Close tdt host failed."; + } } isInit_ = false; + } bool GePlugin::IsGlobal() { diff --git a/tf_adapter/util/npu_attrs.cc b/tf_adapter/util/npu_attrs.cc index 2d6a2f1bf2258977d4a2d64fa81b7679ba7fb774..fa8a94e70eeb61d0c0ad320529284ce7ddf88782 100644 --- a/tf_adapter/util/npu_attrs.cc +++ b/tf_adapter/util/npu_attrs.cc @@ -36,6 +36,34 @@ limitations under the License. #include namespace tensorflow { + +std::string GetDumpPath() { + static std::string dump_path = ""; + if (dump_path == "") { + char *npu_collect_path = std::getenv("NPU_COLLECT_PATH"); + if (npu_collect_path != nullptr) { + std::string base_path_str(npu_collect_path); + uint32_t device_id = 0; + GetEnvDeviceID(device_id); + base_path_str += "/graph/" + std::to_string(mmGetPid()) + "_" + std::to_string(device_id) + "/"; + if (mmAccess2(base_path_str.c_str(), M_F_OK) != EN_OK) { + int32_t ret = mmMkdir(base_path_str.c_str(), M_IRUSR | M_IWUSR | M_IXUSR); + if (ret != 0) { + ADP_LOG(WARNING) << "create dump graph dir failed, path:" << base_path_str; + dump_path = "./"; + } else { + dump_path = base_path_str; + } + } else { + dump_path = base_path_str; + } + } else { + dump_path = "./"; + } + } + return dump_path; +} + Status GetEnvDeviceID(uint32_t &device_id) { int64 phy_device_id = -1; int64 logic_device_id = -1; @@ -217,6 +245,21 @@ inline Status CheckDeviceList(std::string local_device_list) { return Status::OK(); } +inline Status checkEnableDp(bool enable_dp) { + const char *tdt_uninit_env = std::getenv("ASCEND_TDT_UNINIT"); + bool tdt_init = true; + if (tdt_uninit_env != nullptr && std::atoi(tdt_uninit_env) == 1) { + tdt_init = false; + } + + if (enable_dp && !tdt_init) { + return errors::InvalidArgument("In the environment, " + "'ASCEND_TDT_UNINIT' must be 0 when parameter 'enable_data_pre_proc' is set to true"); + } else { + return Status::OK(); + } +} + std::map NpuAttrs::GetSessOptions(OpKernelConstruction *ctx) { std::map sess_options; std::string variable_format_optimize = std::to_string(true); @@ -424,7 +467,14 @@ std::map NpuAttrs::GetPassOptions(const GraphOptimizat if (custom_optimizer.name() == "NpuOptimizer") { do_npu_optimizer = true; const auto ¶ms = custom_optimizer.parameter_map(); - if (params.count("enable_data_pre_proc")) { enable_dp = params.at("enable_data_pre_proc").b(); } + if (params.count("enable_data_pre_proc")) { + enable_dp = params.at("enable_data_pre_proc").b(); + Status s = checkEnableDp(enable_dp); + if (!s.ok()) { + ADP_LOG(ERROR) << s.error_message(); + LOG(FATAL) << s.error_message(); + } + } if (params.count("use_off_line")) { use_off_line = params.at("use_off_line").b(); } if (params.count("mix_compile_mode")) { mix_compile_mode = params.at("mix_compile_mode").b(); } if (params.count("iterations_per_loop")) { iterations_per_loop = params.at("iterations_per_loop").i(); } diff --git a/tf_adapter/util/npu_attrs.h b/tf_adapter/util/npu_attrs.h index 3d83f7cc830a931fbe92ca8f3f4502428dcb7974..ec882957496cd1d24f3df89f2c272181d68b7391 100644 --- a/tf_adapter/util/npu_attrs.h +++ b/tf_adapter/util/npu_attrs.h @@ -39,6 +39,7 @@ limitations under the License. // single load all npu mode namespace tensorflow { +std::string GetDumpPath(); Status GetEnvDeviceID(uint32_t &device_id); void Split(const std::string &s, std::vector &result, const char *delchar = " "); class NpuAttrs { diff --git a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h index 893fa9558d6d6f072f1ad53a9802e336a624d134..9259e4e6cf09b9760a8c324eaccacfe12067149f 100644 --- a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h +++ b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h @@ -181,6 +181,9 @@ class FuncSpec : public TaskSpec { void SetBuilt() const { built_.store(true); } bool Built() const { return built_; } + void SetNeedLoop() const { need_loop_.store(true); } + bool NeedLoop() const { return need_loop_; } + void PruneInputs(int num_inputs, TFE_TensorHandle **inputs, std::vector &pruned) const { prune_func_(num_inputs, inputs, pruned); } @@ -204,6 +207,7 @@ class FuncSpec : public TaskSpec { PruneInputsFunc prune_func_; const std::map> dependent_host_resources_; std::atomic_bool mutable built_{false}; + std::atomic_bool mutable need_loop_{false}; }; } // namespace npu diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index 23986d761af6af7e98e829349cc4d3a5d224780d..7f2c9b747f41824fce6e18be546dc98b3ce39534 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -57,6 +57,49 @@ class NpuHostFixedAllocator : public tensorflow::Allocator { void DeallocateRaw(void *ptr) override { delete this; } std::unique_ptr ptr_; }; + +size_t RemoveRedundantHcomControlEdges(tensorflow::Graph *graph) { + const static std::string kHcomType = "HcomAllReduce"; + std::vector edges_to_remove; + for (auto edge : graph->edges()) { + if (edge->IsControlEdge() && (edge->src()->type_string() == kHcomType || edge->dst()->type_string() == kHcomType)) { + edges_to_remove.push_back(edge); + } + } + for (auto edge : edges_to_remove) { + graph->RemoveEdge(edge); + } + return edges_to_remove.size(); +} + +bool IsGraphNeedLoop(const std::string &name, const tensorflow::GraphDef &def) { + const static std::unordered_set kNecessaryOps{"IteratorV2"}; + const static std::unordered_set kTrainKeyOps{"ResourceApplyKerasMomentum"}; + + bool contain_necessary_op = false; + bool contain_train_key_op = false; + + for (const auto &ndef : def.node()) { + if (!contain_train_key_op && kTrainKeyOps.count(ndef.op())) { + if (contain_necessary_op) { + return true; + } + contain_train_key_op = true; + } + if (!contain_necessary_op && kNecessaryOps.count(ndef.op())) { + if (contain_train_key_op) { + return true; + } + contain_necessary_op = true; + } + } + return false; +} + +bool IsGraphNeedLoop(const std::string &name, const tensorflow::Graph *graph) { + return IsGraphNeedLoop(name, graph->ToGraphDefDebug()); +} + } // namespace void NpuDevice::CreateIteratorProvider(TFE_Context *context, const tensorflow::Tensor *tensor, @@ -112,14 +155,6 @@ std::string NpuDevice::CreateDevice(const char *name, int device_index, return "Failed init graph engine: create new session failed"; } - std::shared_ptr parser = - domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); - if (parser == nullptr) { - return "Failed init graph engine: create tensorflow model parser failed"; - } - - std::unique_ptr status(TF_NewStatus(), TF_DeleteStatus); - *device = new (std::nothrow) NpuDevice(); if (*device == nullptr) { return "Failed create new npu device instance"; @@ -133,14 +168,14 @@ std::string NpuDevice::CreateDevice(const char *name, int device_index, } void NpuDevice::ReleaseResource() { - DLOG() << "Start cancel all uncompleted async call"; - CancellationManager()->StartCancel(); - std::vector> thread_guarder; for (auto &iterator_provider : iterator_providers_) { auto provider = iterator_provider.second; thread_guarder.emplace_back(std::async([provider]() { provider->Destroy(); })); } + + DLOG() << "Start cancel all uncompleted async call"; + CancellationManager()->StartCancel(); } void NpuDevice::DeleteDevice(void *device) { @@ -290,6 +325,8 @@ void NpuDevice::FixGraphArgRetvalIndex(tensorflow::Graph *graph) { tensorflow::Status NpuDevice::TransResourceInput2GraphNode( TFE_Context *context, tensorflow::Graph *graph, int num_inputs, TFE_TensorHandle **inputs, std::map> &dependent_host_resources) { + (void)RemoveRedundantHcomControlEdges(graph); + std::set arg_is_variable; std::set arg_is_iterator; @@ -1247,6 +1284,124 @@ void NpuDevice::RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inp } // 计数-2 } +namespace { +tensorflow::Node *AddVarInitToGraph(TFE_Context *context, std::string name, tensorflow::Tensor tensor, + tensorflow::Graph *graph, TF_Status *status) { + tensorflow::Node *variable; + tensorflow::Node *value; + tensorflow::Node *assign_variable; + + NPU_CTX_REQUIRES_OK_RETURN(status, + tensorflow::NodeBuilder(name, "VarHandleOp") + .Attr("container", "") + .Attr("shared_name", name) + .Attr("dtype", tensor.dtype()) + .Attr("shape", tensor.shape()) + .Finalize(graph, &variable), + assign_variable); + NPU_CTX_REQUIRES_OK_RETURN(status, + tensorflow::NodeBuilder(name + "_v", "Const") + .Attr("value", tensor) + .Attr("dtype", tensor.dtype()) + .Finalize(graph, &value), + assign_variable); + NPU_CTX_REQUIRES_OK_RETURN(status, + tensorflow::NodeBuilder(name + "_op", "AssignVariableOp") + .Input(variable, 0) + .Input(value, 0) + .Attr("dtype", tensor.dtype()) + .Finalize(graph, &assign_variable), + assign_variable); + + AssembleOpDef(variable); + AssembleOpDef(value); + AssembleOpDef(assign_variable); + + AssembleOutputDesc(TensorShapes({kScalarShape}), {tensorflow::DT_RESOURCE}, variable); + AssembleOutputDesc(TensorShapes({tensor.shape()}), {tensor.dtype()}, value); + AssembleInputDesc(TensorShapes({kScalarShape, tensor.shape()}), {tensorflow::DT_RESOURCE, tensor.dtype()}, + assign_variable); + return assign_variable; +} +} // namespace + +void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *status) { + static std::atomic_bool initialized{false}; + static std::atomic_int64_t current_loop_size{1}; + static tensorflow::Status init_status = tensorflow::Status::OK(); + static std::uint64_t loop_var_graph_id = 0; + const static std::string kLoopVarName = "npu_runconfig/iterations_per_loop"; + + if (current_loop_size == loop) return; + + LOG(INFO) << "Set npu loop size to " << loop; + + if (!initialized.exchange(true)) { + tensorflow::Graph graph(tensorflow::OpRegistry::Global()); + AddVarInitToGraph(context, "npu_runconfig/loop_cond", tensorflow::Tensor(tensorflow::int64(0)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + AddVarInitToGraph(context, "npu_runconfig/one", tensorflow::Tensor(tensorflow::int64(1)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + AddVarInitToGraph(context, "npu_runconfig/zero", tensorflow::Tensor(tensorflow::int64(0)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + + RunGeGraphPin2CpuAnonymous(context, "set_npu_loop_conditions", graph.ToGraphDefDebug(), 0, nullptr, 0, nullptr, + status); + if (TF_GetCode(status) != TF_OK) return; + + tensorflow::Node *variable; + tensorflow::Node *arg; + tensorflow::Node *assign_variable; + + tensorflow::Graph graph2(tensorflow::OpRegistry::Global()); + + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName, "VarHandleOp") + .Attr("container", "") + .Attr("shared_name", kLoopVarName) + .Attr("dtype", tensorflow::DT_INT64) + .Attr("shape", kScalarShape) + .Finalize(&graph2, &variable)); + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName + "_v", "_Arg") + .Attr("T", tensorflow::DT_INT64) + .Attr("index", 0) + .Finalize(&graph2, &arg)); + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName + "_op", "AssignVariableOp") + .Input(variable, 0) + .Input(arg, 0) + .Attr("dtype", tensorflow::DT_INT64) + .Finalize(&graph2, &assign_variable)); + + AssembleOpDef(variable); + AssembleOpDef(arg); + AssembleOpDef(assign_variable); + + AssembleOutputDesc(TensorShapes({kScalarShape}), {tensorflow::DT_RESOURCE}, variable); + AssembleOutputDesc(TensorShapes({kScalarShape}), {tensorflow::DT_INT64}, arg); + AssembleInputDesc(TensorShapes({kScalarShape, kScalarShape}), {tensorflow::DT_RESOURCE, tensorflow::DT_INT64}, + assign_variable); + + loop_var_graph_id = AddGeGraph(context, "set_loop_var", graph2.ToGraphDefDebug(), status); + init_status = status->status; + if (TF_GetCode(status) != TF_OK) return; + } + + status->status = init_status; + if (TF_GetCode(status) != TF_OK) return; + + std::vector inputs(1); + inputs[0] = + tensorflow::wrap(tensorflow::TensorHandle::CreateLocalHandle(tensorflow::Tensor(tensorflow::int64(loop - 1)))); + + RunGeGraphPin2Cpu(context, loop_var_graph_id, inputs.size(), inputs.data(), {}, 0, nullptr, status); + + if (TF_GetCode(status) == TF_OK) { + current_loop_size = loop; + } + for (auto handle : inputs) { + TFE_DeleteTensorHandle(handle); + } +} + void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf_num_inputs, TFE_TensorHandle **tf_inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status) { @@ -1279,15 +1434,12 @@ void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf if (kCustomKernelEnabled) { // TODO:这里根据小循环策略修改值 - int64_t iterations_per_loop = 1; - if (!spec->DependentHostResources().empty()) { - for (auto node : spec->Graph()->op_nodes()) { - if (node->IsWhileNode()) { - iterations_per_loop = kGlobalLoopSize; - break; - } - } + int64_t iterations_per_loop = spec->NeedLoop() ? kGlobalLoopSize : 1; + if (iterations_per_loop > 1) { + SetNpuLoopSize(context, iterations_per_loop, status); + if (TF_GetCode(status) != TF_OK) return; } + for (const auto &resource : spec->DependentHostResources()) { LOG(INFO) << "Start consume iterator resource " << resource.second->Name() << " " << iterations_per_loop << " times"; @@ -1563,6 +1715,9 @@ uint64_t NpuDevice::AddGeGraph(TFE_Context *context, uint64_t graph_id, const st parser->ParseProtoWithSubgraph(&def, request_subgraph, ge_compute_graph), graph_id); ge::Graph ge_graph = ge::GraphUtils::CreateGraphFromComputeGraph(ge_compute_graph); + + ge_graph.SetNeedIteration(IsGraphNeedLoop(name, def)); + NPU_CTX_REQUIRES_GE_OK_RETURN(status, "Graph engine Add graph", GeSession()->AddGraph(graph_id, ge_graph), graph_id); return graph_id; } @@ -1704,6 +1859,10 @@ std::shared_ptr NpuDevice::CacheFuncSpec( const std::map> &dependent_host_resources, const std::string &reason) { auto spec = std::make_shared(op_spec, ndef, ge_graph_id, std::move(graph), prune_func, dependent_host_resources, reason); + + if (spec->Graph() != nullptr && IsGraphNeedLoop(ndef.op(), spec->Graph())) { + spec->SetNeedLoop(); + } cached_func_specs_[op] = spec; DLOG() << "Cache function op spec " << spec->DebugString(); return spec; diff --git a/tf_adapter_2.x/npu_device/core/npu_device.h b/tf_adapter_2.x/npu_device/core/npu_device.h index ab6d317ab62e57439482c8b648ba9e922fed3cd3..a8413d0cf6d51ecd5b29cddb0f3731c57bbf453d 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.h +++ b/tf_adapter_2.x/npu_device/core/npu_device.h @@ -89,6 +89,8 @@ class NpuDevice { void RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inputs, TFE_TensorHandle **inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status); + void SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *status); + void RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int num_inputs, TFE_TensorHandle **inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status); diff --git a/tf_adapter_2.x/npu_device/core/npu_env.h b/tf_adapter_2.x/npu_device/core/npu_env.h index 5435faef9e21c861da20e5bfc5f97f7cf55f9fbd..9f3f8bfc862b605dd86f1092f62a529fdea92bc5 100644 --- a/tf_adapter_2.x/npu_device/core/npu_env.h +++ b/tf_adapter_2.x/npu_device/core/npu_env.h @@ -44,4 +44,10 @@ const static bool kPerfEnabled = []() -> bool { return perf_enabled; }(); +const static bool kAutoLoopEnabled = []() -> bool { + bool loop_enabled = false; + tensorflow::ReadBoolFromEnvVar("NPU_EXPERIMENTAL_AUTO_LOOP", false, &loop_enabled); + return loop_enabled; +}(); + #endif // TENSORFLOW_NPU_ENV_H diff --git a/tf_adapter_2.x/npu_device/core/npu_wrapper.cpp b/tf_adapter_2.x/npu_device/core/npu_wrapper.cpp index 6c65db344c4cfbddee8abaeb58c9589e081a755e..691638373838d5debb8d2ae4bf9269f0cc66fc0a 100644 --- a/tf_adapter_2.x/npu_device/core/npu_wrapper.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_wrapper.cpp @@ -6,6 +6,8 @@ #include #include "Python.h" +#define PY_MAJOR_VERSION 3 +#define PY_MINOR_VERSION 7 #include "pybind11/chrono.h" #include "pybind11/complex.h" #include "pybind11/functional.h" diff --git a/tf_adapter_2.x/python/npu_device/_api/distribute/hccl.py b/tf_adapter_2.x/python/npu_device/_api/distribute/hccl.py index 761c1eb6badf2671536e13d90e8c58344406b59b..7b3415a32d341efac3e9c8a179837f8bb7053944 100644 --- a/tf_adapter_2.x/python/npu_device/_api/distribute/hccl.py +++ b/tf_adapter_2.x/python/npu_device/_api/distribute/hccl.py @@ -13,21 +13,26 @@ def _all_reduce(values, reduction, fusion, fusion_id, group): mean_reduce = True reduction = 'sum' + topo_guarder = tf.group(values) if isinstance(values, Iterable): reduced_values = [] for value in values: reduced_value = hccl_ops.allreduce(value, reduction, fusion, fusion_id, group) - if mean_reduce: - reduced_values.append(tf.divide(reduced_value, tf.cast(workers_num, reduced_value.dtype))) - else: - reduced_values.append(reduced_value) + typed_workers_num = tf.cast(workers_num, reduced_value.dtype) + with tf.control_dependencies([topo_guarder]): + if mean_reduce: + reduced_values.append(tf.divide(reduced_value, typed_workers_num)) + else: + reduced_values.append(tf.identity(reduced_value)) return reduced_values else: reduced_value = hccl_ops.allreduce(values, reduction, fusion, fusion_id, group) - if mean_reduce: - return tf.divide(reduced_value, tf.cast(workers_num, reduced_value.dtype)) - else: - return reduced_value + typed_workers_num = tf.cast(workers_num, reduced_value.dtype) + with tf.control_dependencies([topo_guarder]): + if mean_reduce: + return tf.divide(reduced_value, typed_workers_num) + else: + return tf.identity(reduced_value) def all_reduce(values, reduction, fusion=1, fusion_id=-1, group="hccl_world_group"): diff --git a/tf_adapter_2.x/python/npu_device/npu_device.py b/tf_adapter_2.x/python/npu_device/npu_device.py index 950c56f1d065dbbe82b5f3d407f836134135546e..1abe6a11c8c16f1b4331f9e144f26a9bcb037b75 100644 --- a/tf_adapter_2.x/python/npu_device/npu_device.py +++ b/tf_adapter_2.x/python/npu_device/npu_device.py @@ -50,6 +50,7 @@ def open(ctx=None, device_index=None, global_options=None, session_options=None) global_options['ge.exec.deployMode'] = "0" global_options['ge.exec.isUseHcom'] = "1" global_options['ge.exec.hcclFlag'] = "1" + global_options['ge.exec.hcomParallel'] = "1" global_options['ge.exec.rankId'] = str(worker_id) error_message = _npu_device_backends.Open(ctx._handle, NPU, device_index, global_options, session_options) @@ -122,6 +123,8 @@ class NpuDeviceHandle(object): tf_decorated_func = self._hacked_tensorflow_function(*args, **kwargs)(func) def wrapper(*func_args, **func_kwargs): + if not hasattr(self._thread_local, "_entrance_function"): + self._thread_local._entrance_function = None if self._thread_local._entrance_function is not None: return func(*func_args, **func_kwargs) self._thread_local._entrance_function = func.__name__