From 082573f22e3098d85da0e7da6134f081d7199e4d Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Mon, 17 Oct 2022 02:22:55 +0000 Subject: [PATCH 01/10] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=96=87=E4=BB=B6=20Te?= =?UTF-8?q?nsorFlow/contrib/cv/SimpleHumanPose=5FID0956=5Ffor=5FTensorFlow?= =?UTF-8?q?/src/lib/tfflat/base.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/lib/tfflat/base.py | 456 ------------------ 1 file changed, 456 deletions(-) delete mode 100644 TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py deleted file mode 100644 index c7ff3a36a..000000000 --- a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py +++ /dev/null @@ -1,456 +0,0 @@ -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ -# Copyright 2021 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from npu_bridge.npu_init import * -import tensorflow as tf -import tensorflow.contrib.slim as slim -import numpy as np -from collections import OrderedDict as dict -import setproctitle -import os -import os.path as osp -import glob -import abc -import math - -from .net_utils import average_gradients, aggregate_batch, get_optimizer, get_tower_summary_dict -from .saver import load_model, Saver -from .timer import Timer -from .logger import colorlogger -from .utils import approx_equal - -class ModelDesc(object): - __metaclass__ = abc.ABCMeta - def __init__(self): - self._loss = None - self._inputs = [] - self._outputs = [] - self._tower_summary = [] - - def set_inputs(self, *vars): - self._inputs = vars - - def set_outputs(self, *vars): - self._outputs = vars - - def set_loss(self, var): - if not isinstance(var, tf.Tensor): - raise ValueError("Loss must be an single tensor.") - # assert var.get_shape() == [], 'Loss tensor must be a scalar shape but got {} shape'.format(var.get_shape()) - self._loss = var - - def get_loss(self, include_wd=False): - if self._loss is None: - raise ValueError("Network doesn't define the final loss") - - if include_wd: - weight_decay = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES) - weight_decay = tf.add_n(weight_decay) - return self._loss + weight_decay - else: - return self._loss - - def get_inputs(self): - if len(self._inputs) == 0: - raise ValueError("Network doesn't define the inputs") - return self._inputs - - def get_outputs(self): - if len(self._outputs) == 0: - raise ValueError("Network doesn't define the outputs") - return self._outputs - - def add_tower_summary(self, name, vars, reduced_method='mean'): - assert reduced_method == 'mean' or reduced_method == 'sum', \ - "Summary tensor only supports sum- or mean- reduced method" - if isinstance(vars, list): - for v in vars: - if vars.get_shape() == None: - print('Summary tensor {} got an unknown shape.'.format(name)) - else: - assert v.get_shape().as_list() == [], \ - "Summary tensor only supports scalar but got {}".format(v.get_shape().as_list()) - tf.add_to_collection(name, v) - else: - if vars.get_shape() == None: - print('Summary tensor {} got an unknown shape.'.format(name)) - else: - assert vars.get_shape().as_list() == [], \ - "Summary tensor only supports scalar but got {}".format(vars.get_shape().as_list()) - tf.add_to_collection(name, vars) - self._tower_summary.append([name, reduced_method]) - - @abc.abstractmethod - def make_network(self, is_train): - pass - - -class Base(object): - __metaclass__ = abc.ABCMeta - """ - build graph: - _make_graph - make_inputs - make_network - add_tower_summary - get_summary - - train/test - """ - - def __init__(self, net, cfg, data_iter=None, log_name='logs.txt'): - self._input_list = [] - self._output_list = [] - self._outputs = [] - self.graph_ops = None - - self.net = net - self.cfg = cfg - - self.cur_epoch = 0 - - self.summary_dict = {} - - # timer - self.tot_timer = Timer() - self.gpu_timer = Timer() - self.read_timer = Timer() - - # logger - self.logger = colorlogger(cfg.log_dir, log_name=log_name) - - # initialize tensorflow - tfconfig = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) - tfconfig.gpu_options.allow_growth = True - self.sess = tf.Session(config=npu_config_proto(config_proto=tfconfig)) - - # build_graph - self.build_graph() - - # get data iter - self._data_iter = data_iter - - @abc.abstractmethod - def _make_data(self): - return - - @abc.abstractmethod - def _make_graph(self): - return - - def build_graph(self): - # all variables should be in the same graph and stored in cpu. - with tf.device('/cpu:0'): - tf.set_random_seed(2333) - self.graph_ops = self._make_graph() - if not isinstance(self.graph_ops, list) and not isinstance(self.graph_ops, tuple): - self.graph_ops = [self.graph_ops] - self.summary_dict.update( get_tower_summary_dict(self.net._tower_summary) ) - - def load_weights(self, model=None): - - if model == 'last_epoch': - sfiles = os.path.join(self.cfg.model_dump_dir, 'snapshot_*.ckpt.meta') - sfiles = glob.glob(sfiles) - if len(sfiles) > 0: - sfiles.sort(key=os.path.getmtime) - sfiles = [i[:-5] for i in sfiles if i.endswith('.meta')] - model = sfiles[-1] - else: - self.logger.critical('No snapshot model exists.') - return - - if isinstance(model, int): - model = os.path.join(self.cfg.model_dump_dir_test, 'snapshot_%d.ckpt' % model) # 修改 - - if isinstance(model, str) and (osp.exists(model + '.meta') or osp.exists(model)): - self.logger.info('Initialized model weights from {} ...'.format(model)) - load_model(self.sess, model) - if model.split('/')[-1].startswith('snapshot_'): - self.cur_epoch = int(model[model.find('snapshot_')+9:model.find('.ckpt')]) - self.logger.info('Current epoch is %d.' % self.cur_epoch) - else: - self.logger.critical('Load nothing. There is no model in path {}.'.format(model)) - - def next_feed(self): - if self._data_iter is None: - raise ValueError('No input data.') - feed_dict = dict() - for inputs in self._input_list: - blobs = next(self._data_iter) - for i, inp in enumerate(inputs): - inp_shape = inp.get_shape().as_list() - if None in inp_shape: - feed_dict[inp] = blobs[i] - else: - feed_dict[inp] = blobs[i].reshape(*inp_shape) - return feed_dict - -class Trainer(Base): - def __init__(self, net, cfg, data_iter=None): - self.lr_eval = cfg.lr - self.lr = tf.Variable(cfg.lr, trainable=False) - self._optimizer = get_optimizer(self.lr, cfg.optimizer) - - super(Trainer, self).__init__(net, cfg, data_iter, log_name='train_logs.txt') - - # make data - self._data_iter, self.itr_per_epoch = self._make_data() - - def _make_data(self): - from dataset import Dataset - from gen_batch import generate_batch - - d = Dataset() - train_data = d.load_train_data() - - from tfflat.data_provider import DataFromList, MultiProcessMapDataZMQ, BatchData, MapData - data_load_thread = DataFromList(train_data) - if self.cfg.multi_thread_enable: - data_load_thread = MultiProcessMapDataZMQ(data_load_thread, self.cfg.num_thread, generate_batch, strict=True) - else: - data_load_thread = MapData(data_load_thread, generate_batch) - data_load_thread = BatchData(data_load_thread, self.cfg.batch_size) - - data_load_thread.reset_state() - dataiter = data_load_thread.get_data() - - return dataiter, math.ceil(len(train_data)/self.cfg.batch_size/self.cfg.num_gpus) - - def _make_graph(self): - self.logger.info("Generating training graph on {} GPUs ...".format(self.cfg.num_gpus)) - - weights_initializer = slim.xavier_initializer() - biases_initializer = tf.constant_initializer(0.) - biases_regularizer = tf.no_regularizer - weights_regularizer = tf.contrib.layers.l2_regularizer(self.cfg.weight_decay) - - tower_grads = [] - with tf.variable_scope(tf.get_variable_scope()): - for i in range(self.cfg.num_gpus): - with tf.device('/cpu:0'): - with tf.name_scope('tower_%d' % i) as name_scope: - # Force all Variables to reside on the CPU. - with slim.arg_scope([slim.model_variable, slim.variable], device='/device:CPU:0'): - with slim.arg_scope([slim.conv2d, slim.conv2d_in_plane, \ - slim.conv2d_transpose, slim.separable_conv2d, - slim.fully_connected], - weights_regularizer=weights_regularizer, - biases_regularizer=biases_regularizer, - weights_initializer=weights_initializer, - biases_initializer=biases_initializer): - # loss over single GPU - self.net.make_network(is_train=True) - if i == self.cfg.num_gpus - 1: - loss = self.net.get_loss(include_wd=True) - else: - loss = self.net.get_loss() - self._input_list.append( self.net.get_inputs() ) - - tf.get_variable_scope().reuse_variables() - - if i == 0: - if self.cfg.num_gpus > 1 and self.cfg.bn_train is True: - self.logger.warning("BN is calculated only on single GPU.") - extra_update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, name_scope) - with tf.control_dependencies(extra_update_ops): - grads = self._optimizer.compute_gradients(loss) - else: - grads = self._optimizer.compute_gradients(loss) - final_grads = [] - with tf.variable_scope('Gradient_Mult') as scope: - for grad, var in grads: - final_grads.append((grad, var)) - tower_grads.append(final_grads) - - if len(tower_grads) > 1: - grads = average_gradients(tower_grads) - else: - grads = tower_grads[0] - - apply_gradient_op = self._optimizer.apply_gradients(grads) - train_op = tf.group(apply_gradient_op, *extra_update_ops) - - return train_op - - def train(self): - - # saver - self.logger.info('Initialize saver ...') - train_saver = Saver(self.sess, tf.global_variables(), self.cfg.model_dump_dir) - - # initialize weights - self.logger.info('Initialize all variables ...') - self.sess.run(tf.variables_initializer(tf.global_variables(), name='init')) - self.load_weights('last_epoch' if self.cfg.continue_train else self.cfg.init_model) - - self.logger.info('Start training ...') - start_itr = self.cur_epoch * self.itr_per_epoch + 1 - end_itr = self.itr_per_epoch * self.cfg.end_epoch + 1 - for itr in range(start_itr, end_itr): - self.tot_timer.tic() - - self.cur_epoch = itr // self.itr_per_epoch - setproctitle.setproctitle('train epoch:' + str(self.cur_epoch)) - - # apply current learning policy - cur_lr = self.cfg.get_lr(self.cur_epoch) - if not approx_equal(cur_lr, self.lr_eval): - print(self.lr_eval, cur_lr) - self.sess.run(tf.assign(self.lr, cur_lr)) - - # input data - self.read_timer.tic() - feed_dict = self.next_feed() - self.read_timer.toc() - - # train one step - self.gpu_timer.tic() - _, self.lr_eval, *summary_res = self.sess.run( - [self.graph_ops[0], self.lr, *self.summary_dict.values()], feed_dict=feed_dict) - self.gpu_timer.toc() - - itr_summary = dict() - for i, k in enumerate(self.summary_dict.keys()): - itr_summary[k] = summary_res[i] - - screen = [ - 'Epoch %d itr %d/%d:' % (self.cur_epoch, itr, self.itr_per_epoch), - 'lr: %g' % (self.lr_eval), - 'speed: %.2f(%.2fs r%.2f)s/itr' % ( - self.tot_timer.average_time, self.gpu_timer.average_time, self.read_timer.average_time), - '%.2fh/epoch' % (self.tot_timer.average_time / 3600. * self.itr_per_epoch), - ' '.join(map(lambda x: '%s: %.4f' % (x[0], x[1]), itr_summary.items())), - ] - - - #TODO(display stall?) - if itr % self.cfg.display == 0: - self.logger.info(' '.join(screen)) - - if itr % self.itr_per_epoch == 0: - train_saver.save_model(self.cur_epoch) - - self.tot_timer.toc() - -class Tester(Base): - def __init__(self, net, cfg, data_iter=None): - super(Tester, self).__init__(net, cfg, data_iter, log_name='test_logs.txt') - - def next_feed(self, batch_data=None): - if self._data_iter is None and batch_data is None: - raise ValueError('No input data.') - feed_dict = dict() - if batch_data is None: - for inputs in self._input_list: - blobs = next(self._data_iter) - for i, inp in enumerate(inputs): - inp_shape = inp.get_shape().as_list() - if None in inp_shape: - feed_dict[inp] = blobs[i] - else: - feed_dict[inp] = blobs[i].reshape(*inp_shape) - else: - assert isinstance(batch_data, list) or isinstance(batch_data, tuple), "Input data should be list-type." - assert len(batch_data) == len(self._input_list[0]), "Input data is incomplete." - - batch_size = self.cfg.batch_size - if self._input_list[0][0].get_shape().as_list()[0] is None: - # fill batch - for i in range(len(batch_data)): - batch_size = (len(batch_data[i]) + self.cfg.num_gpus - 1) // self.cfg.num_gpus - total_batches = batch_size * self.cfg.num_gpus - left_batches = total_batches - len(batch_data[i]) - if left_batches > 0: - batch_data[i] = np.append(batch_data[i], np.zeros((left_batches, *batch_data[i].shape[1:])), axis=0) - self.logger.warning("Fill some blanks to fit batch_size which wastes %d%% computation" % ( - left_batches * 100. / total_batches)) - else: - assert self.cfg.batch_size * self.cfg.num_gpus == len(batch_data[0]), \ - "Input batch doesn't fit placeholder batch." - - for j, inputs in enumerate(self._input_list): - for i, inp in enumerate(inputs): - feed_dict[ inp ] = batch_data[i][j * batch_size: (j+1) * batch_size] - - #@TODO(delete) - assert (j+1) * batch_size == len(batch_data[0]), 'check batch' - return feed_dict, batch_size - - def _make_graph(self): - self.logger.info("Generating testing graph on {} GPUs ...".format(self.cfg.num_gpus)) - - with tf.variable_scope(tf.get_variable_scope()): - for i in range(self.cfg.num_gpus): - with tf.device('/cpu:0'): - with tf.name_scope('tower_%d' % i) as name_scope: - with slim.arg_scope([slim.model_variable, slim.variable], device='/device:CPU:0'): - self.net.make_network(is_train=False) - self._input_list.append(self.net.get_inputs()) - self._output_list.append(self.net.get_outputs()) - - tf.get_variable_scope().reuse_variables() - - self._outputs = aggregate_batch(self._output_list) - - # run_meta = tf.RunMetadata() - # opts = tf.profiler.ProfileOptionBuilder.float_operation() - # flops = tf.profiler.profile(self.sess.graph, run_meta=run_meta, cmd='op', options=opts) - # - # opts = tf.profiler.ProfileOptionBuilder.trainable_variables_parameter() - # params = tf.profiler.profile(self.sess.graph, run_meta=run_meta, cmd='op', options=opts) - - # print("{:,} --- {:,}".format(flops.total_float_ops, params.total_parameters)) - # from IPython import embed; embed() - - return self._outputs - - def predict_one(self, data=None): - # TODO(reduce data in limited batch) - assert len(self.summary_dict) == 0, "still not support scalar summary in testing stage" - setproctitle.setproctitle('test epoch:' + str(self.cur_epoch)) - - self.read_timer.tic() - feed_dict, batch_size = self.next_feed(data) - self.read_timer.toc() - - self.gpu_timer.tic() - res = self.sess.run([*self.graph_ops, *self.summary_dict.values()], feed_dict=feed_dict) - self.gpu_timer.toc() - - if data is not None and len(data[0]) < self.cfg.num_gpus * batch_size: - for i in range(len(res)): - res[i] = res[i][:len(data[0])] - - return res - - def test(self): - pass - - -- Gitee From e18a35d0151b9c1316320593e9c8e80c5d138592 Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Mon, 17 Oct 2022 02:23:58 +0000 Subject: [PATCH 02/10] 8P Signed-off-by: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> --- .../src/lib/tfflat/base.py | 490 ++++++++++++++++++ 1 file changed, 490 insertions(+) create mode 100644 TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py new file mode 100644 index 000000000..eb9342db7 --- /dev/null +++ b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py @@ -0,0 +1,490 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from npu_bridge.npu_init import * +from npu_bridge.hccl import hccl_ops + +import tensorflow as tf +import tensorflow.contrib.slim as slim +import numpy as np +from collections import OrderedDict as dict +import setproctitle +import os +import os.path as osp +import glob +import abc +import math + +from .net_utils import average_gradients, aggregate_batch, get_optimizer, get_tower_summary_dict +from .saver import load_model, Saver +from .timer import Timer +from .logger import colorlogger +from .utils import approx_equal + + +class ModelDesc(object): + __metaclass__ = abc.ABCMeta + + def __init__(self): + self._loss = None + self._inputs = [] + self._outputs = [] + self._tower_summary = [] + + def set_inputs(self, *vars): + self._inputs = vars + + def set_outputs(self, *vars): + self._outputs = vars + + def set_loss(self, var): + if not isinstance(var, tf.Tensor): + raise ValueError("Loss must be an single tensor.") + # assert var.get_shape() == [], 'Loss tensor must be a scalar shape but got {} shape'.format(var.get_shape()) + self._loss = var + + def get_loss(self, include_wd=False): + if self._loss is None: + raise ValueError("Network doesn't define the final loss") + + if include_wd: + weight_decay = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES) + weight_decay = tf.add_n(weight_decay) + return self._loss + weight_decay + else: + return self._loss + + def get_inputs(self): + if len(self._inputs) == 0: + raise ValueError("Network doesn't define the inputs") + return self._inputs + + def get_outputs(self): + if len(self._outputs) == 0: + raise ValueError("Network doesn't define the outputs") + return self._outputs + + def add_tower_summary(self, name, vars, reduced_method='mean'): + assert reduced_method == 'mean' or reduced_method == 'sum', \ + "Summary tensor only supports sum- or mean- reduced method" + if isinstance(vars, list): + for v in vars: + if vars.get_shape() == None: + print('Summary tensor {} got an unknown shape.'.format(name)) + else: + assert v.get_shape().as_list() == [], \ + "Summary tensor only supports scalar but got {}".format(v.get_shape().as_list()) + tf.add_to_collection(name, v) + else: + if vars.get_shape() == None: + print('Summary tensor {} got an unknown shape.'.format(name)) + else: + assert vars.get_shape().as_list() == [], \ + "Summary tensor only supports scalar but got {}".format(vars.get_shape().as_list()) + tf.add_to_collection(name, vars) + self._tower_summary.append([name, reduced_method]) + + @abc.abstractmethod + def make_network(self, is_train): + pass + + +class Base(object): + __metaclass__ = abc.ABCMeta + """ + build graph: + _make_graph + make_inputs + make_network + add_tower_summary + get_summary + + train/test + """ + + def __init__(self, net, cfg, data_iter=None, log_name='logs.txt'): + self._input_list = [] + self._output_list = [] + self._outputs = [] + self.graph_ops = None + + self.net = net + self.cfg = cfg + + self.cur_epoch = 0 + + self.summary_dict = {} + + # timer + self.tot_timer = Timer() + self.gpu_timer = Timer() + self.read_timer = Timer() + + # logger + self.logger = colorlogger(cfg.log_dir, log_name=log_name) + + # initialize tensorflow + tfconfig = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) + tfconfig.gpu_options.allow_growth = True + + #############npu modify start############### + custom_op = tfconfig.graph_options.rewrite_options.custom_optimizers.add() + custom_op.name = "NpuOptimizer" + custom_op.parameter_map["use_off_line"].b = True + custom_op.parameter_map["precision_mode"].s = tf.compat.as_bytes("allow_mix_precision") + custom_op.parameter_map["dynamic_input"].b = True + custom_op.parameter_map["dynamic_graph_execute_mode"].s = tf.compat.as_bytes("lazy_recompile") + # + custom_op.parameter_map["hcom_parallel"].b = True + # + tfconfig.graph_options.rewrite_options.remapping = RewriterConfig.OFF # off remap + tfconfig.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF + #############npu modify end############### + + self.sess = tf.Session(config=npu_config_proto(config_proto=tfconfig)) + self.saver = tf.train.Saver() + self.best_saver = tf.train.Saver() + + # build_graph + self.build_graph() + + # get data iter + self._data_iter = data_iter + + @abc.abstractmethod + def _make_data(self): + return + + @abc.abstractmethod + def _make_graph(self): + return + + def build_graph(self): + # all variables should be in the same graph and stored in cpu. + with tf.device('/cpu:0'): + tf.set_random_seed(2333) + self.graph_ops = self._make_graph() + if not isinstance(self.graph_ops, list) and not isinstance(self.graph_ops, tuple): + self.graph_ops = [self.graph_ops] + self.summary_dict.update(get_tower_summary_dict(self.net._tower_summary)) + + def load_weights(self, model=None): + + if model == 'last_epoch': + sfiles = os.path.join(self.cfg.model_dump_dir, 'snapshot_*.ckpt.meta') + sfiles = glob.glob(sfiles) + if len(sfiles) > 0: + sfiles.sort(key=os.path.getmtime) + sfiles = [i[:-5] for i in sfiles if i.endswith('.meta')] + model = sfiles[-1] + else: + self.logger.critical('No snapshot model exists.') + return + + if isinstance(model, int): + model = os.path.join(self.cfg.model_dump_dir_test, 'snapshot_%d.ckpt' % model) # 修改 + + if isinstance(model, str) and (osp.exists(model + '.meta') or osp.exists(model)): + self.logger.info('Initialized model weights from {} ...'.format(model)) + load_model(self.sess, model) + if model.split('/')[-1].startswith('snapshot_'): + self.cur_epoch = int(model[model.find('snapshot_') + 9:model.find('.ckpt')]) + self.logger.info('Current epoch is %d.' % self.cur_epoch) + else: + self.logger.critical('Load nothing. There is no model in path {}.'.format(model)) + + def next_feed(self): + if self._data_iter is None: + raise ValueError('No input data.') + feed_dict = dict() + rank_size = int(os.getenv('RANK_SIZE')) + rank_id = int(os.getenv('RANK_ID')) + for inputs in self._input_list: # (image, target_coord, valid) + blobs = next(self._data_iter) + blobs = [blobs[i][int(rank_id * (self.cfg.batch_size / rank_size)):int( + (rank_id + 1) * self.cfg.batch_size / rank_size)] for i in range(3)] + for i, inp in enumerate(inputs): + inp_shape = inp.get_shape().as_list() + if None in inp_shape: + feed_dict[inp] = blobs[i] + else: + feed_dict[inp] = blobs[i].reshape(*inp_shape) + return feed_dict + + +class Trainer(Base): + def __init__(self, net, cfg, data_iter=None): + self.lr_eval = cfg.lr + self.lr = tf.Variable(cfg.lr, trainable=False) + self._optimizer = get_optimizer(self.lr, cfg.optimizer) + super(Trainer, self).__init__(net, cfg, data_iter, log_name='train_logs.txt') + + # make data + self._data_iter, self.itr_per_epoch = self._make_data() + + def _make_data(self): + from dataset import Dataset + from gen_batch import generate_batch + + d = Dataset() + train_data = d.load_train_data() # [dict()] + + from tfflat.data_provider import DataFromList, MultiProcessMapDataZMQ, BatchData, MapData + data_load_thread = DataFromList(train_data) + if self.cfg.multi_thread_enable: + data_load_thread = MultiProcessMapDataZMQ(data_load_thread, self.cfg.num_thread, generate_batch, + strict=True) + else: + data_load_thread = MapData(data_load_thread, generate_batch) + data_load_thread = BatchData(data_load_thread, self.cfg.batch_size) # 按batch获取数据shuffle + + data_load_thread.reset_state() + dataiter = data_load_thread.get_data() + + # return dataiter, math.ceil(len(train_data) / self.cfg.batch_size / self.cfg.num_gpus) + return dataiter, math.ceil(len(train_data) / self.cfg.batch_size) + + def _make_graph(self): + self.logger.info("Generating training graph on {} GPUs ...".format(self.cfg.num_gpus)) + + weights_initializer = slim.xavier_initializer() + biases_initializer = tf.constant_initializer(0.) + biases_regularizer = tf.no_regularizer + weights_regularizer = tf.contrib.layers.l2_regularizer(self.cfg.weight_decay) + + tower_grads = [] + with tf.variable_scope(tf.get_variable_scope()): + for i in range(self.cfg.num_gpus): + with tf.device('/cpu:0'): + with tf.name_scope('tower_%d' % i) as name_scope: + # Force all Variables to reside on the CPU. + with slim.arg_scope([slim.model_variable, slim.variable], device='/device:CPU:0'): + with slim.arg_scope([slim.conv2d, slim.conv2d_in_plane, + slim.conv2d_transpose, slim.separable_conv2d, + slim.fully_connected], + weights_regularizer=weights_regularizer, + biases_regularizer=biases_regularizer, + weights_initializer=weights_initializer, + biases_initializer=biases_initializer): + # loss over single GPU + self.net.make_network(is_train=True) + if i == self.cfg.num_gpus - 1: + loss = self.net.get_loss(include_wd=True) + else: + loss = self.net.get_loss() + self._input_list.append(self.net.get_inputs()) + + tf.get_variable_scope().reuse_variables() + + if i == 0: + if self.cfg.num_gpus > 1 and self.cfg.bn_train is True: + self.logger.warning("BN is calculated only on single GPU.") + extra_update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, name_scope) + with tf.control_dependencies(extra_update_ops): + grads = self._optimizer.compute_gradients(loss) + else: + grads = self._optimizer.compute_gradients(loss) + final_grads = [] + with tf.variable_scope('Gradient_Mult') as scope: + for grad, var in grads: + final_grads.append((grad, var)) + tower_grads.append(final_grads) + + if len(tower_grads) > 1: + grads = average_gradients(tower_grads) + else: + grads = tower_grads[0] + + apply_gradient_op = self._optimizer.apply_gradients(grads) + train_op = tf.group(apply_gradient_op, *extra_update_ops) + + return train_op + + def train(self): + + # saver + self.logger.info('Initialize saver ...') + train_saver = Saver(self.sess, tf.global_variables(), self.cfg.model_dump_dir) + + # initialize weights + self.logger.info('Initialize all variables ...') + self.sess.run(tf.variables_initializer(tf.global_variables(), name='init')) + self.load_weights('last_epoch' if self.cfg.continue_train else self.cfg.init_model) + + rank_size = int(os.getenv('RANK_SIZE')) + if int(rank_size) > 1: + input = tf.trainable_variables() + bcast_global_variables_op = hccl_ops.broadcast(input, 0) + self.sess.run(bcast_global_variables_op) + + self.logger.info('Start training ...') + print("---------------"+str(self.cfg.end_epoch)) + print("---------------"+str(self.itr_per_epoch)) + for epoch in range(self.cfg.end_epoch): + for step in range(self.itr_per_epoch): + self.tot_timer.tic() + + # self.cur_epoch = itr // self.itr_per_epoch + self.cur_epoch = epoch + setproctitle.setproctitle('train epoch:' + str(self.cur_epoch)) + + # apply current learning policy + cur_lr = self.cfg.get_lr(self.cur_epoch) + if not approx_equal(cur_lr, self.lr_eval): + print(self.lr_eval, cur_lr) + self.sess.run(tf.assign(self.lr, cur_lr)) + + # input data + self.read_timer.tic() + feed_dict = self.next_feed() + self.read_timer.toc() + + # train one step + self.gpu_timer.tic() + _, self.lr_eval, *summary_res = self.sess.run( + [self.graph_ops[0], self.lr, *self.summary_dict.values()], feed_dict=feed_dict) + self.gpu_timer.toc() + + itr_summary = dict() + for i, k in enumerate(self.summary_dict.keys()): + itr_summary[k] = summary_res[i] + + screen = [ + 'Epoch %d itr %d/%d:' % (self.cur_epoch, epoch * step, self.itr_per_epoch * self.cfg.end_epoch), + 'lr: %g' % self.lr_eval, + 'speed: %.2f(%.2fs r%.2f)s/epoch * step' % ( + self.tot_timer.average_time, self.gpu_timer.average_time, self.read_timer.average_time), + '%.2fh/epoch' % (self.tot_timer.average_time / 3600. * self.itr_per_epoch), + ' '.join(map(lambda x: '%s: %.4f' % (x[0], x[1]), itr_summary.items())), + ] + + # TODO(display stall?) + if (epoch * step) % self.cfg.display == 0: + self.logger.info(' '.join(screen)) + + if (epoch * step) % self.itr_per_epoch == 0: + train_saver.save_model(self.cur_epoch) + + self.tot_timer.toc() + + +class Tester(Base): + def __init__(self, net, cfg, data_iter=None): + super(Tester, self).__init__(net, cfg, data_iter, log_name='test_logs.txt') + + def next_feed(self, batch_data=None): + if self._data_iter is None and batch_data is None: + raise ValueError('No input data.') + feed_dict = dict() + if batch_data is None: + for inputs in self._input_list: + blobs = next(self._data_iter) + for i, inp in enumerate(inputs): + inp_shape = inp.get_shape().as_list() + if None in inp_shape: + feed_dict[inp] = blobs[i] + else: + feed_dict[inp] = blobs[i].reshape(*inp_shape) + else: + assert isinstance(batch_data, list) or isinstance(batch_data, tuple), "Input data should be list-type." + assert len(batch_data) == len(self._input_list[0]), "Input data is incomplete." + + batch_size = self.cfg.batch_size + if self._input_list[0][0].get_shape().as_list()[0] is None: + # fill batch + for i in range(len(batch_data)): + batch_size = (len(batch_data[i]) + self.cfg.num_gpus - 1) // self.cfg.num_gpus + total_batches = batch_size * self.cfg.num_gpus + left_batches = total_batches - len(batch_data[i]) + if left_batches > 0: + batch_data[i] = np.append(batch_data[i], np.zeros((left_batches, *batch_data[i].shape[1:])), + axis=0) + self.logger.warning("Fill some blanks to fit batch_size which wastes %d%% computation" % ( + left_batches * 100. / total_batches)) + else: + assert self.cfg.batch_size * self.cfg.num_gpus == len(batch_data[0]), \ + "Input batch doesn't fit placeholder batch." + + for j, inputs in enumerate(self._input_list): + for i, inp in enumerate(inputs): + feed_dict[inp] = batch_data[i][j * batch_size: (j + 1) * batch_size] + + # @TODO(delete) + assert (j + 1) * batch_size == len(batch_data[0]), 'check batch' + return feed_dict, batch_size + + def _make_graph(self): + self.logger.info("Generating testing graph on {} GPUs ...".format(self.cfg.num_gpus)) + + with tf.variable_scope(tf.get_variable_scope()): + for i in range(self.cfg.num_gpus): + with tf.device('/cpu:0'): + with tf.name_scope('tower_%d' % i) as name_scope: + with slim.arg_scope([slim.model_variable, slim.variable], device='/device:CPU:0'): + self.net.make_network(is_train=False) + self._input_list.append(self.net.get_inputs()) + self._output_list.append(self.net.get_outputs()) + + tf.get_variable_scope().reuse_variables() + + self._outputs = aggregate_batch(self._output_list) + + # run_meta = tf.RunMetadata() + # opts = tf.profiler.ProfileOptionBuilder.float_operation() + # flops = tf.profiler.profile(self.sess.graph, run_meta=run_meta, cmd='op', options=opts) + # + # opts = tf.profiler.ProfileOptionBuilder.trainable_variables_parameter() + # params = tf.profiler.profile(self.sess.graph, run_meta=run_meta, cmd='op', options=opts) + + # print("{:,} --- {:,}".format(flops.total_float_ops, params.total_parameters)) + # from IPython import embed; embed() + + return self._outputs + + def predict_one(self, data=None): + # TODO(reduce data in limited batch) + assert len(self.summary_dict) == 0, "still not support scalar summary in testing stage" + setproctitle.setproctitle('test epoch:' + str(self.cur_epoch)) + + self.read_timer.tic() + feed_dict, batch_size = self.next_feed(data) + self.read_timer.toc() + + self.gpu_timer.tic() + res = self.sess.run([*self.graph_ops, *self.summary_dict.values()], feed_dict=feed_dict) + self.gpu_timer.toc() + + if data is not None and len(data[0]) < self.cfg.num_gpus * batch_size: + for i in range(len(res)): + res[i] = res[i][:len(data[0])] + + return res + + def test(self): + pass -- Gitee From 89c685af09b867da759ef543cc718bafa05ea264 Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Mon, 17 Oct 2022 02:26:32 +0000 Subject: [PATCH 03/10] update TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/net_utils.py. Signed-off-by: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> --- .../src/lib/tfflat/net_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/net_utils.py b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/net_utils.py index ea0df01e1..918718cbb 100644 --- a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/net_utils.py +++ b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/net_utils.py @@ -129,6 +129,7 @@ def get_optimizer(lr, optimizer='momentum'): optimizer = tf.train.AdamOptimizer(lr) else: raise ValueError('invalid optimizer') + optimizer = npu_distributed_optimizer_wrapper(optimizer) return optimizer def get_tower_summary_dict(summary): -- Gitee From cb4b24a0d8f679299e591bcde1d1341933116699 Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Mon, 17 Oct 2022 02:27:47 +0000 Subject: [PATCH 04/10] update TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/main/config.py. Signed-off-by: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> --- .../cv/SimpleHumanPose_ID0956_for_TensorFlow/src/main/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/main/config.py b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/main/config.py index 22ab25902..51a29b347 100644 --- a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/main/config.py +++ b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/main/config.py @@ -75,6 +75,7 @@ class Config: pixel_means = np.array([[[123.68, 116.78, 103.94]]]) ## training config + rank_size = int(os.getenv('RANK_SIZE')) lr_dec_epoch = [90, 120] end_epoch = 140 lr = 5e-4 -- Gitee From 36aa59e584c10ecf71317dc244c2fe4d6ad5963d Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Mon, 17 Oct 2022 02:31:09 +0000 Subject: [PATCH 05/10] 8P Signed-off-by: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> --- .../test/train_full_8p.sh | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/test/train_full_8p.sh diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/test/train_full_8p.sh b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/test/train_full_8p.sh new file mode 100644 index 000000000..b3fcb2e67 --- /dev/null +++ b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/test/train_full_8p.sh @@ -0,0 +1,128 @@ +#!/bin/bash + + +export RANK_SIZE=1 +export JOB_ID=10087 +export RANK_ID_START=0 +export RANK_TABLE_FILE=/hdu/zhengleilei/SimpleHumanPose_ID0956_for_TensorFlow_1P/configs/rank_table_8p.json + + + +cur_path=`pwd` +data_path="" +ckpt_path="" +Network="SimpleHumanPose_ID0956_for_TensorFlow" +#batch_size=32 +batch_size=256 +epoch=140 +# train_performance_1p.sh perf +# train_full_1p.sh acc +CaseName="${Network}_bs${batch_size}_${RANK_SIZE}p_acc" + + +if [[ $1 == --help || $1 == -h ]];then + echo"usage:./train_full_1p.sh " + echo " " + echo "parameter explain: + --precision_mode precision mode(allow_fp32_to_fp16/force_fp16/must_keep_origin_dtype/allow_mix_precision) + --over_dump if or not over detection, default is False + --data_dump_flag data dump flag, default is False + --data_dump_step data dump step, default is 10 + --profiling if or not profiling for performance debug, default is False + --data_path source data of training + -h/--help show help message + " + exit 1 +fi +for para in $* +do + if [[ $para == --data_path* ]];then + data_path=`echo ${para#*=}` + echo "${data_path}" + elif [[ $para == --ckpt_path* ]];then + ckpt_path=`echo ${para#*=}` + echo "${ckpt_path}" + elif [[ $para == --batch_size* ]];then + batch_size=`echo ${para#*=}` + echo "${batch_size}" + elif [[ $para == --max_steps* ]];then + max_steps=`echo ${para#*=}` + echo "${max_steps}" + fi +done +if [[ $data_path == "" ]];then + echo "[Error] para \"data_path\" must be confing" + exit 1 +fi + + +cd $cur_path/../ +rm -rf ${data_path}/model_dump/COCO/* +# CHANGE PARM +# data_path +sed -i "72a\ \ \ \ data_path = \'$data_path\'" src/data/COCO/dataset.py +sed -i "53a\ \ \ \ data_path = \'$data_path\'" src/main/config.py +# end_epoch +# sed -i "s/end_epoch = 140/end_epoch = 1/g" src/main/config.py +# sed -i "s/test_epoch = 140/test_epoch = 1/g" src/main/test_my.py +# START +start_time=$(date +%s) +for((RANK_ID=$RANK_ID_START;RANK_ID<$((RANK_SIZE+RANK_ID_START));RANK_ID++)); +do + echo "Device ID: $RANK_ID" + export RANK_ID=$RANK_ID + export ASCEND_DEVICE_ID=$RANK_ID + ASCEND_DEVICE_ID=$RANK_ID + DEVICE_INDEX=$RANK_ID + export DEVICE_INDEX=${DEVICE_INDEX} + if [ -d ${cur_path}/output/${ASCEND_DEVICE_ID} ];then + rm -rf ${cur_path}/output/${ASCEND_DEVICE_ID} + mkdir -p ${cur_path}/output/${ASCEND_DEVICE_ID}/ckpt + else + mkdir -p ${cur_path}/output/${ASCEND_DEVICE_ID}/ckpt + fi + # train + nohup python3 -u src/main/train.py > ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log 2>&1 & + wait + # eval + mkdir -p ${data_path}/model_dump/COCO/ + mv cache/result/model_dump/COCO/snapshot_${epoch}.ckpt* ${data_path}/model_dump/COCO/ + echo "mv cache/result/model_dump/COCO/snapshot_${epoch}.ckpt* ${data_path}/model_dump/COCO/" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log + nohup python3 -u src/main/test_my.py >> ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log 2>&1 & +done +wait +end_time=$(date +%s) +e2e_time=$(( ${end_time} - ${start_time} )) +# sed -i "s/end_epoch = 1/end_epoch = 140/g" src/main/config.py +# sed -i "s/test_epoch = 1/test_epoch = 140/g" src/main/test_my.py +rm -rf ${data_path}/model_dump/COCO/* + + +echo "------------------ Final result ------------------" +BatchSize=${batch_size} +DeviceType=`uname -m` +# getFPS +Time=`grep loss ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log | grep 'Epoch 0' | awk -F's/itr ' 'END{print $2}' | awk -F'h' '{print 60*60*$1}'` +FPS=`awk 'BEGIN{printf "%.4f\n",'${batch_size}'*4682/'${Time}'}'` +ActualFPS=${FPS} +TrainingTime=`awk 'BEGIN{printf "%.2f\n",'${BatchSize}'*4682/'${FPS}'}'` +# getAcc +train_accuracy=`grep 'IoU' ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log | awk -F'] = ' '{print $2}' | head -n 1` +# getLoss +grep 'loss' ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log | grep Epoch | awk -F'loss: ' '{print $2}' > ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${CaseName}_loss.txt +ActualLoss=`awk 'END {print}' ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${CaseName}_loss.txt` +echo "Final Performance images/sec : ${FPS}" +echo "Final Train Accuracy : ${train_accuracy}" +echo "E2E Training Duration sec : ${e2e_time}" + + +echo "Network = ${Network}" > ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "RankSize = ${RANK_SIZE}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "BatchSize = ${BatchSize}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "DeviceType = ${DeviceType}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "CaseName = ${CaseName}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "ActualFPS = ${ActualFPS}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "TrainingTime = ${TrainingTime}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "TrainAccuracy = ${train_accuracy}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "ActualLoss = ${ActualLoss}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "E2ETrainingTime = ${e2e_time}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log -- Gitee From 3abf9056d553aa60b96e093d368b8011108a2f54 Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Mon, 17 Oct 2022 02:32:25 +0000 Subject: [PATCH 06/10] 8P Signed-off-by: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> --- .../test/train_performance_8p.sh | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/test/train_performance_8p.sh diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/test/train_performance_8p.sh b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/test/train_performance_8p.sh new file mode 100644 index 000000000..be83794d8 --- /dev/null +++ b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/test/train_performance_8p.sh @@ -0,0 +1,122 @@ +#!/bin/bash + + +export RANK_SIZE=8 +export JOB_ID=10087 +export RANK_ID_START=0 +export RANK_TABLE_FILE=/hdu/zhengleilei/SimpleHumanPose_ID0956_for_TensorFlow_1P/configs/rank_table_8p.json + + + +cur_path=`pwd` +data_path="" +ckpt_path="" +Network="SimpleHumanPose_ID0956_for_TensorFlow" +#batch_size=32 +batch_size=256 +epoch=1 +# train_performance_1p.sh perf +# train_full_1p.sh acc +CaseName="${Network}_bs${batch_size}_${RANK_SIZE}p_perf" + + +if [[ $1 == --help || $1 == -h ]];then + echo"usage:./train_performance_1p.sh " + echo " " + echo "parameter explain: + --precision_mode precision mode(allow_fp32_to_fp16/force_fp16/must_keep_origin_dtype/allow_mix_precision) + --over_dump if or not over detection, default is False + --data_dump_flag data dump flag, default is False + --data_dump_step data dump step, default is 10 + --profiling if or not profiling for performance debug, default is False + --data_path source data of training + -h/--help show help message + " + exit 1 +fi +for para in $* +do + if [[ $para == --data_path* ]];then + data_path=`echo ${para#*=}` + echo "${data_path}" + elif [[ $para == --ckpt_path* ]];then + ckpt_path=`echo ${para#*=}` + echo "${ckpt_path}" + elif [[ $para == --batch_size* ]];then + batch_size=`echo ${para#*=}` + echo "${batch_size}" + elif [[ $para == --max_steps* ]];then + max_steps=`echo ${para#*=}` + echo "${max_steps}" + fi +done +if [[ $data_path == "" ]];then + echo "[Error] para \"data_path\" must be confing" + exit 1 +fi + + +cd $cur_path/../ +# CHANGE PARM +# data_path +sed -i "72a\ \ \ \ data_path = \'$data_path\'" src/data/COCO/dataset.py +sed -i "52a\ \ \ \ data_path = \'$data_path\'" src/main/config.py +# end_epoch +sed -i "s/end_epoch = 140/end_epoch = 1/g" src/main/config.py +# START +start_time=$(date +%s) +for((RANK_ID=$RANK_ID_START;RANK_ID<$((RANK_SIZE+RANK_ID_START));RANK_ID++)); +do + echo "Device ID: $RANK_ID" + export RANK_ID=$RANK_ID + export ASCEND_DEVICE_ID=$RANK_ID + ASCEND_DEVICE_ID=$RANK_ID + DEVICE_INDEX=$RANK_ID + export DEVICE_INDEX=${DEVICE_INDEX} + if [ -d ${cur_path}/output/${ASCEND_DEVICE_ID} ];then + rm -rf ${cur_path}/output/${ASCEND_DEVICE_ID} + mkdir -p ${cur_path}/output/${ASCEND_DEVICE_ID}/ckpt + else + mkdir -p ${cur_path}/output/${ASCEND_DEVICE_ID}/ckpt + fi + nohup python3 -u src/main/train.py > ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log 2>&1 & + # wait + # eval + # cp -r cache/result/model_dump ${data_path}/ + # wait + # nohup python3 -u main/test_my.py >> > ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log 2>&1 & +done +wait +end_time=$(date +%s) +e2e_time=$(( ${end_time} - ${start_time} )) +sed -i "s/end_epoch = 1/end_epoch = 140/g" src/main/config.py + + +echo "------------------ Final result ------------------" +BatchSize=${batch_size} +DeviceType=`uname -m` +# getFPS +Time=`grep loss ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log | grep 'Epoch 0' | awk -F's/itr ' 'END{print $2}' | awk -F'h' '{print 60*60*$1}'` +FPS=`awk 'BEGIN{printf "%.4f\n",'${batch_size}'*4682/'${Time}'}'` +ActualFPS=${FPS} +TrainingTime=`awk 'BEGIN{printf "%.2f\n",'${BatchSize}'*4682/'${FPS}'}'` +# getAcc +train_accuracy="None" +# getLoss +grep 'loss' ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${ASCEND_DEVICE_ID}.log | grep Epoch | awk -F'loss: ' '{print $2}' > ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${CaseName}_loss.txt +ActualLoss=`awk 'END {print}' ${cur_path}/output/${ASCEND_DEVICE_ID}/train_${CaseName}_loss.txt` +echo "Final Performance images/sec : ${FPS}" +echo "Final Train Accuracy : ${train_accuracy}" +echo "E2E Training Duration sec : ${e2e_time}" + + +echo "Network = ${Network}" > ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "RankSize = ${RANK_SIZE}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "BatchSize = ${BatchSize}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "DeviceType = ${DeviceType}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "CaseName = ${CaseName}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "ActualFPS = ${ActualFPS}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "TrainingTime = ${TrainingTime}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "TrainAccuracy = ${train_accuracy}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "ActualLoss = ${ActualLoss}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log +echo "E2ETrainingTime = ${e2e_time}" >> ${cur_path}/output/${ASCEND_DEVICE_ID}/${CaseName}.log -- Gitee From 6be098c6b1d938e57903e3c4a6d5744877221bf5 Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Tue, 18 Oct 2022 02:12:27 +0000 Subject: [PATCH 07/10] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=96=87=E4=BB=B6=20Te?= =?UTF-8?q?nsorFlow/contrib/cv/SimpleHumanPose=5FID0956=5Ffor=5FTensorFlow?= =?UTF-8?q?/src/lib/tfflat/base.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/lib/tfflat/base.py | 490 ------------------ 1 file changed, 490 deletions(-) delete mode 100644 TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py deleted file mode 100644 index eb9342db7..000000000 --- a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py +++ /dev/null @@ -1,490 +0,0 @@ -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ -# Copyright 2021 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from npu_bridge.npu_init import * -from npu_bridge.hccl import hccl_ops - -import tensorflow as tf -import tensorflow.contrib.slim as slim -import numpy as np -from collections import OrderedDict as dict -import setproctitle -import os -import os.path as osp -import glob -import abc -import math - -from .net_utils import average_gradients, aggregate_batch, get_optimizer, get_tower_summary_dict -from .saver import load_model, Saver -from .timer import Timer -from .logger import colorlogger -from .utils import approx_equal - - -class ModelDesc(object): - __metaclass__ = abc.ABCMeta - - def __init__(self): - self._loss = None - self._inputs = [] - self._outputs = [] - self._tower_summary = [] - - def set_inputs(self, *vars): - self._inputs = vars - - def set_outputs(self, *vars): - self._outputs = vars - - def set_loss(self, var): - if not isinstance(var, tf.Tensor): - raise ValueError("Loss must be an single tensor.") - # assert var.get_shape() == [], 'Loss tensor must be a scalar shape but got {} shape'.format(var.get_shape()) - self._loss = var - - def get_loss(self, include_wd=False): - if self._loss is None: - raise ValueError("Network doesn't define the final loss") - - if include_wd: - weight_decay = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES) - weight_decay = tf.add_n(weight_decay) - return self._loss + weight_decay - else: - return self._loss - - def get_inputs(self): - if len(self._inputs) == 0: - raise ValueError("Network doesn't define the inputs") - return self._inputs - - def get_outputs(self): - if len(self._outputs) == 0: - raise ValueError("Network doesn't define the outputs") - return self._outputs - - def add_tower_summary(self, name, vars, reduced_method='mean'): - assert reduced_method == 'mean' or reduced_method == 'sum', \ - "Summary tensor only supports sum- or mean- reduced method" - if isinstance(vars, list): - for v in vars: - if vars.get_shape() == None: - print('Summary tensor {} got an unknown shape.'.format(name)) - else: - assert v.get_shape().as_list() == [], \ - "Summary tensor only supports scalar but got {}".format(v.get_shape().as_list()) - tf.add_to_collection(name, v) - else: - if vars.get_shape() == None: - print('Summary tensor {} got an unknown shape.'.format(name)) - else: - assert vars.get_shape().as_list() == [], \ - "Summary tensor only supports scalar but got {}".format(vars.get_shape().as_list()) - tf.add_to_collection(name, vars) - self._tower_summary.append([name, reduced_method]) - - @abc.abstractmethod - def make_network(self, is_train): - pass - - -class Base(object): - __metaclass__ = abc.ABCMeta - """ - build graph: - _make_graph - make_inputs - make_network - add_tower_summary - get_summary - - train/test - """ - - def __init__(self, net, cfg, data_iter=None, log_name='logs.txt'): - self._input_list = [] - self._output_list = [] - self._outputs = [] - self.graph_ops = None - - self.net = net - self.cfg = cfg - - self.cur_epoch = 0 - - self.summary_dict = {} - - # timer - self.tot_timer = Timer() - self.gpu_timer = Timer() - self.read_timer = Timer() - - # logger - self.logger = colorlogger(cfg.log_dir, log_name=log_name) - - # initialize tensorflow - tfconfig = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) - tfconfig.gpu_options.allow_growth = True - - #############npu modify start############### - custom_op = tfconfig.graph_options.rewrite_options.custom_optimizers.add() - custom_op.name = "NpuOptimizer" - custom_op.parameter_map["use_off_line"].b = True - custom_op.parameter_map["precision_mode"].s = tf.compat.as_bytes("allow_mix_precision") - custom_op.parameter_map["dynamic_input"].b = True - custom_op.parameter_map["dynamic_graph_execute_mode"].s = tf.compat.as_bytes("lazy_recompile") - # - custom_op.parameter_map["hcom_parallel"].b = True - # - tfconfig.graph_options.rewrite_options.remapping = RewriterConfig.OFF # off remap - tfconfig.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF - #############npu modify end############### - - self.sess = tf.Session(config=npu_config_proto(config_proto=tfconfig)) - self.saver = tf.train.Saver() - self.best_saver = tf.train.Saver() - - # build_graph - self.build_graph() - - # get data iter - self._data_iter = data_iter - - @abc.abstractmethod - def _make_data(self): - return - - @abc.abstractmethod - def _make_graph(self): - return - - def build_graph(self): - # all variables should be in the same graph and stored in cpu. - with tf.device('/cpu:0'): - tf.set_random_seed(2333) - self.graph_ops = self._make_graph() - if not isinstance(self.graph_ops, list) and not isinstance(self.graph_ops, tuple): - self.graph_ops = [self.graph_ops] - self.summary_dict.update(get_tower_summary_dict(self.net._tower_summary)) - - def load_weights(self, model=None): - - if model == 'last_epoch': - sfiles = os.path.join(self.cfg.model_dump_dir, 'snapshot_*.ckpt.meta') - sfiles = glob.glob(sfiles) - if len(sfiles) > 0: - sfiles.sort(key=os.path.getmtime) - sfiles = [i[:-5] for i in sfiles if i.endswith('.meta')] - model = sfiles[-1] - else: - self.logger.critical('No snapshot model exists.') - return - - if isinstance(model, int): - model = os.path.join(self.cfg.model_dump_dir_test, 'snapshot_%d.ckpt' % model) # 修改 - - if isinstance(model, str) and (osp.exists(model + '.meta') or osp.exists(model)): - self.logger.info('Initialized model weights from {} ...'.format(model)) - load_model(self.sess, model) - if model.split('/')[-1].startswith('snapshot_'): - self.cur_epoch = int(model[model.find('snapshot_') + 9:model.find('.ckpt')]) - self.logger.info('Current epoch is %d.' % self.cur_epoch) - else: - self.logger.critical('Load nothing. There is no model in path {}.'.format(model)) - - def next_feed(self): - if self._data_iter is None: - raise ValueError('No input data.') - feed_dict = dict() - rank_size = int(os.getenv('RANK_SIZE')) - rank_id = int(os.getenv('RANK_ID')) - for inputs in self._input_list: # (image, target_coord, valid) - blobs = next(self._data_iter) - blobs = [blobs[i][int(rank_id * (self.cfg.batch_size / rank_size)):int( - (rank_id + 1) * self.cfg.batch_size / rank_size)] for i in range(3)] - for i, inp in enumerate(inputs): - inp_shape = inp.get_shape().as_list() - if None in inp_shape: - feed_dict[inp] = blobs[i] - else: - feed_dict[inp] = blobs[i].reshape(*inp_shape) - return feed_dict - - -class Trainer(Base): - def __init__(self, net, cfg, data_iter=None): - self.lr_eval = cfg.lr - self.lr = tf.Variable(cfg.lr, trainable=False) - self._optimizer = get_optimizer(self.lr, cfg.optimizer) - super(Trainer, self).__init__(net, cfg, data_iter, log_name='train_logs.txt') - - # make data - self._data_iter, self.itr_per_epoch = self._make_data() - - def _make_data(self): - from dataset import Dataset - from gen_batch import generate_batch - - d = Dataset() - train_data = d.load_train_data() # [dict()] - - from tfflat.data_provider import DataFromList, MultiProcessMapDataZMQ, BatchData, MapData - data_load_thread = DataFromList(train_data) - if self.cfg.multi_thread_enable: - data_load_thread = MultiProcessMapDataZMQ(data_load_thread, self.cfg.num_thread, generate_batch, - strict=True) - else: - data_load_thread = MapData(data_load_thread, generate_batch) - data_load_thread = BatchData(data_load_thread, self.cfg.batch_size) # 按batch获取数据shuffle - - data_load_thread.reset_state() - dataiter = data_load_thread.get_data() - - # return dataiter, math.ceil(len(train_data) / self.cfg.batch_size / self.cfg.num_gpus) - return dataiter, math.ceil(len(train_data) / self.cfg.batch_size) - - def _make_graph(self): - self.logger.info("Generating training graph on {} GPUs ...".format(self.cfg.num_gpus)) - - weights_initializer = slim.xavier_initializer() - biases_initializer = tf.constant_initializer(0.) - biases_regularizer = tf.no_regularizer - weights_regularizer = tf.contrib.layers.l2_regularizer(self.cfg.weight_decay) - - tower_grads = [] - with tf.variable_scope(tf.get_variable_scope()): - for i in range(self.cfg.num_gpus): - with tf.device('/cpu:0'): - with tf.name_scope('tower_%d' % i) as name_scope: - # Force all Variables to reside on the CPU. - with slim.arg_scope([slim.model_variable, slim.variable], device='/device:CPU:0'): - with slim.arg_scope([slim.conv2d, slim.conv2d_in_plane, - slim.conv2d_transpose, slim.separable_conv2d, - slim.fully_connected], - weights_regularizer=weights_regularizer, - biases_regularizer=biases_regularizer, - weights_initializer=weights_initializer, - biases_initializer=biases_initializer): - # loss over single GPU - self.net.make_network(is_train=True) - if i == self.cfg.num_gpus - 1: - loss = self.net.get_loss(include_wd=True) - else: - loss = self.net.get_loss() - self._input_list.append(self.net.get_inputs()) - - tf.get_variable_scope().reuse_variables() - - if i == 0: - if self.cfg.num_gpus > 1 and self.cfg.bn_train is True: - self.logger.warning("BN is calculated only on single GPU.") - extra_update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, name_scope) - with tf.control_dependencies(extra_update_ops): - grads = self._optimizer.compute_gradients(loss) - else: - grads = self._optimizer.compute_gradients(loss) - final_grads = [] - with tf.variable_scope('Gradient_Mult') as scope: - for grad, var in grads: - final_grads.append((grad, var)) - tower_grads.append(final_grads) - - if len(tower_grads) > 1: - grads = average_gradients(tower_grads) - else: - grads = tower_grads[0] - - apply_gradient_op = self._optimizer.apply_gradients(grads) - train_op = tf.group(apply_gradient_op, *extra_update_ops) - - return train_op - - def train(self): - - # saver - self.logger.info('Initialize saver ...') - train_saver = Saver(self.sess, tf.global_variables(), self.cfg.model_dump_dir) - - # initialize weights - self.logger.info('Initialize all variables ...') - self.sess.run(tf.variables_initializer(tf.global_variables(), name='init')) - self.load_weights('last_epoch' if self.cfg.continue_train else self.cfg.init_model) - - rank_size = int(os.getenv('RANK_SIZE')) - if int(rank_size) > 1: - input = tf.trainable_variables() - bcast_global_variables_op = hccl_ops.broadcast(input, 0) - self.sess.run(bcast_global_variables_op) - - self.logger.info('Start training ...') - print("---------------"+str(self.cfg.end_epoch)) - print("---------------"+str(self.itr_per_epoch)) - for epoch in range(self.cfg.end_epoch): - for step in range(self.itr_per_epoch): - self.tot_timer.tic() - - # self.cur_epoch = itr // self.itr_per_epoch - self.cur_epoch = epoch - setproctitle.setproctitle('train epoch:' + str(self.cur_epoch)) - - # apply current learning policy - cur_lr = self.cfg.get_lr(self.cur_epoch) - if not approx_equal(cur_lr, self.lr_eval): - print(self.lr_eval, cur_lr) - self.sess.run(tf.assign(self.lr, cur_lr)) - - # input data - self.read_timer.tic() - feed_dict = self.next_feed() - self.read_timer.toc() - - # train one step - self.gpu_timer.tic() - _, self.lr_eval, *summary_res = self.sess.run( - [self.graph_ops[0], self.lr, *self.summary_dict.values()], feed_dict=feed_dict) - self.gpu_timer.toc() - - itr_summary = dict() - for i, k in enumerate(self.summary_dict.keys()): - itr_summary[k] = summary_res[i] - - screen = [ - 'Epoch %d itr %d/%d:' % (self.cur_epoch, epoch * step, self.itr_per_epoch * self.cfg.end_epoch), - 'lr: %g' % self.lr_eval, - 'speed: %.2f(%.2fs r%.2f)s/epoch * step' % ( - self.tot_timer.average_time, self.gpu_timer.average_time, self.read_timer.average_time), - '%.2fh/epoch' % (self.tot_timer.average_time / 3600. * self.itr_per_epoch), - ' '.join(map(lambda x: '%s: %.4f' % (x[0], x[1]), itr_summary.items())), - ] - - # TODO(display stall?) - if (epoch * step) % self.cfg.display == 0: - self.logger.info(' '.join(screen)) - - if (epoch * step) % self.itr_per_epoch == 0: - train_saver.save_model(self.cur_epoch) - - self.tot_timer.toc() - - -class Tester(Base): - def __init__(self, net, cfg, data_iter=None): - super(Tester, self).__init__(net, cfg, data_iter, log_name='test_logs.txt') - - def next_feed(self, batch_data=None): - if self._data_iter is None and batch_data is None: - raise ValueError('No input data.') - feed_dict = dict() - if batch_data is None: - for inputs in self._input_list: - blobs = next(self._data_iter) - for i, inp in enumerate(inputs): - inp_shape = inp.get_shape().as_list() - if None in inp_shape: - feed_dict[inp] = blobs[i] - else: - feed_dict[inp] = blobs[i].reshape(*inp_shape) - else: - assert isinstance(batch_data, list) or isinstance(batch_data, tuple), "Input data should be list-type." - assert len(batch_data) == len(self._input_list[0]), "Input data is incomplete." - - batch_size = self.cfg.batch_size - if self._input_list[0][0].get_shape().as_list()[0] is None: - # fill batch - for i in range(len(batch_data)): - batch_size = (len(batch_data[i]) + self.cfg.num_gpus - 1) // self.cfg.num_gpus - total_batches = batch_size * self.cfg.num_gpus - left_batches = total_batches - len(batch_data[i]) - if left_batches > 0: - batch_data[i] = np.append(batch_data[i], np.zeros((left_batches, *batch_data[i].shape[1:])), - axis=0) - self.logger.warning("Fill some blanks to fit batch_size which wastes %d%% computation" % ( - left_batches * 100. / total_batches)) - else: - assert self.cfg.batch_size * self.cfg.num_gpus == len(batch_data[0]), \ - "Input batch doesn't fit placeholder batch." - - for j, inputs in enumerate(self._input_list): - for i, inp in enumerate(inputs): - feed_dict[inp] = batch_data[i][j * batch_size: (j + 1) * batch_size] - - # @TODO(delete) - assert (j + 1) * batch_size == len(batch_data[0]), 'check batch' - return feed_dict, batch_size - - def _make_graph(self): - self.logger.info("Generating testing graph on {} GPUs ...".format(self.cfg.num_gpus)) - - with tf.variable_scope(tf.get_variable_scope()): - for i in range(self.cfg.num_gpus): - with tf.device('/cpu:0'): - with tf.name_scope('tower_%d' % i) as name_scope: - with slim.arg_scope([slim.model_variable, slim.variable], device='/device:CPU:0'): - self.net.make_network(is_train=False) - self._input_list.append(self.net.get_inputs()) - self._output_list.append(self.net.get_outputs()) - - tf.get_variable_scope().reuse_variables() - - self._outputs = aggregate_batch(self._output_list) - - # run_meta = tf.RunMetadata() - # opts = tf.profiler.ProfileOptionBuilder.float_operation() - # flops = tf.profiler.profile(self.sess.graph, run_meta=run_meta, cmd='op', options=opts) - # - # opts = tf.profiler.ProfileOptionBuilder.trainable_variables_parameter() - # params = tf.profiler.profile(self.sess.graph, run_meta=run_meta, cmd='op', options=opts) - - # print("{:,} --- {:,}".format(flops.total_float_ops, params.total_parameters)) - # from IPython import embed; embed() - - return self._outputs - - def predict_one(self, data=None): - # TODO(reduce data in limited batch) - assert len(self.summary_dict) == 0, "still not support scalar summary in testing stage" - setproctitle.setproctitle('test epoch:' + str(self.cur_epoch)) - - self.read_timer.tic() - feed_dict, batch_size = self.next_feed(data) - self.read_timer.toc() - - self.gpu_timer.tic() - res = self.sess.run([*self.graph_ops, *self.summary_dict.values()], feed_dict=feed_dict) - self.gpu_timer.toc() - - if data is not None and len(data[0]) < self.cfg.num_gpus * batch_size: - for i in range(len(res)): - res[i] = res[i][:len(data[0])] - - return res - - def test(self): - pass -- Gitee From 16f54348e60e046498813a3674328333645519bb Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Tue, 18 Oct 2022 02:13:17 +0000 Subject: [PATCH 08/10] update Signed-off-by: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> --- .../src/lib/tfflat/base.py | 483 ++++++++++++++++++ 1 file changed, 483 insertions(+) create mode 100644 TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py new file mode 100644 index 000000000..87bc7b8d3 --- /dev/null +++ b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/base.py @@ -0,0 +1,483 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from npu_bridge.npu_init import * +from npu_bridge.hccl import hccl_ops + +import tensorflow as tf +import tensorflow.contrib.slim as slim +import numpy as np +from collections import OrderedDict as dict +import setproctitle +import os +import os.path as osp +import glob +import abc +import math + +from .net_utils import average_gradients, aggregate_batch, get_optimizer, get_tower_summary_dict +from .saver import load_model, Saver +from .timer import Timer +from .logger import colorlogger +from .utils import approx_equal + +rank_size = int(os.getenv('RANK_SIZE')) + +class ModelDesc(object): + __metaclass__ = abc.ABCMeta + def __init__(self): + self._loss = None + self._inputs = [] + self._outputs = [] + self._tower_summary = [] + + def set_inputs(self, *vars): + self._inputs = vars + + def set_outputs(self, *vars): + self._outputs = vars + + def set_loss(self, var): + if not isinstance(var, tf.Tensor): + raise ValueError("Loss must be an single tensor.") + # assert var.get_shape() == [], 'Loss tensor must be a scalar shape but got {} shape'.format(var.get_shape()) + self._loss = var + + def get_loss(self, include_wd=False): + if self._loss is None: + raise ValueError("Network doesn't define the final loss") + + if include_wd: + weight_decay = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES) + weight_decay = tf.add_n(weight_decay) + return self._loss + weight_decay + else: + return self._loss + + def get_inputs(self): + if len(self._inputs) == 0: + raise ValueError("Network doesn't define the inputs") + return self._inputs + + def get_outputs(self): + if len(self._outputs) == 0: + raise ValueError("Network doesn't define the outputs") + return self._outputs + + def add_tower_summary(self, name, vars, reduced_method='mean'): + assert reduced_method == 'mean' or reduced_method == 'sum', \ + "Summary tensor only supports sum- or mean- reduced method" + if isinstance(vars, list): + for v in vars: + if vars.get_shape() == None: + print('Summary tensor {} got an unknown shape.'.format(name)) + else: + assert v.get_shape().as_list() == [], \ + "Summary tensor only supports scalar but got {}".format(v.get_shape().as_list()) + tf.add_to_collection(name, v) + else: + if vars.get_shape() == None: + print('Summary tensor {} got an unknown shape.'.format(name)) + else: + assert vars.get_shape().as_list() == [], \ + "Summary tensor only supports scalar but got {}".format(vars.get_shape().as_list()) + tf.add_to_collection(name, vars) + self._tower_summary.append([name, reduced_method]) + + @abc.abstractmethod + def make_network(self, is_train): + pass + + +class Base(object): + __metaclass__ = abc.ABCMeta + """ + build graph: + _make_graph + make_inputs + make_network + add_tower_summary + get_summary + + train/test + """ + + def __init__(self, net, cfg, data_iter=None, log_name='logs.txt'): + self._input_list = [] + self._output_list = [] + self._outputs = [] + self.graph_ops = None + + self.net = net + self.cfg = cfg + + self.cur_epoch = 0 + + self.summary_dict = {} + + # timer + self.tot_timer = Timer() + self.gpu_timer = Timer() + self.read_timer = Timer() + + # logger + self.logger = colorlogger(cfg.log_dir, log_name=log_name) + + # initialize tensorflow + tfconfig = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) + tfconfig.gpu_options.allow_growth = True + + #############npu modify start############### + custom_op = tfconfig.graph_options.rewrite_options.custom_optimizers.add() + custom_op.name = "NpuOptimizer" + custom_op.parameter_map["use_off_line"].b = True + # + if int(rank_size) > 1: + custom_op.parameter_map["hcom_parallel"].b = True + # + tfconfig.graph_options.rewrite_options.remapping = RewriterConfig.OFF # off remap + tfconfig.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF + #############npu modify end############### + + self.sess = tf.Session(config=npu_config_proto(config_proto=tfconfig)) + + # build_graph + self.build_graph() + + # get data iter + self._data_iter = data_iter + + @abc.abstractmethod + def _make_data(self): + return + + @abc.abstractmethod + def _make_graph(self): + return + + def build_graph(self): + # all variables should be in the same graph and stored in cpu. + with tf.device('/cpu:0'): + tf.set_random_seed(2333) + self.graph_ops = self._make_graph() + if not isinstance(self.graph_ops, list) and not isinstance(self.graph_ops, tuple): + self.graph_ops = [self.graph_ops] + self.summary_dict.update( get_tower_summary_dict(self.net._tower_summary) ) + + def load_weights(self, model=None): + + if model == 'last_epoch': + sfiles = os.path.join(self.cfg.model_dump_dir, 'snapshot_*.ckpt.meta') + sfiles = glob.glob(sfiles) + if len(sfiles) > 0: + sfiles.sort(key=os.path.getmtime) + sfiles = [i[:-5] for i in sfiles if i.endswith('.meta')] + model = sfiles[-1] + else: + self.logger.critical('No snapshot model exists.') + return + + if isinstance(model, int): + model = os.path.join(self.cfg.model_dump_dir_test, 'snapshot_%d.ckpt' % model) # 修改 + + if isinstance(model, str) and (osp.exists(model + '.meta') or osp.exists(model)): + self.logger.info('Initialized model weights from {} ...'.format(model)) + load_model(self.sess, model) + if model.split('/')[-1].startswith('snapshot_'): + self.cur_epoch = int(model[model.find('snapshot_')+9:model.find('.ckpt')]) + self.logger.info('Current epoch is %d.' % self.cur_epoch) + else: + self.logger.critical('Load nothing. There is no model in path {}.'.format(model)) + + def next_feed(self): + if self._data_iter is None: + raise ValueError('No input data.') + feed_dict = dict() + for inputs in self._input_list: + blobs = next(self._data_iter) + for i, inp in enumerate(inputs): + inp_shape = inp.get_shape().as_list() + if None in inp_shape: + feed_dict[inp] = blobs[i] + else: + feed_dict[inp] = blobs[i].reshape(*inp_shape) + return feed_dict + +class Trainer(Base): + def __init__(self, net, cfg, data_iter=None): + self.lr_eval = cfg.lr + self.lr = tf.Variable(cfg.lr, trainable=False) + self._optimizer = get_optimizer(self.lr, cfg.optimizer) + + super(Trainer, self).__init__(net, cfg, data_iter, log_name='train_logs.txt') + + # make data + self._data_iter, self.itr_per_epoch = self._make_data() + + def _make_data(self): + from dataset import Dataset + from gen_batch import generate_batch + + d = Dataset() + train_data = d.load_train_data() + + from tfflat.data_provider import DataFromList, MultiProcessMapDataZMQ, BatchData, MapData + data_load_thread = DataFromList(train_data) + if self.cfg.multi_thread_enable: + data_load_thread = MultiProcessMapDataZMQ(data_load_thread, self.cfg.num_thread, generate_batch, strict=True) + else: + data_load_thread = MapData(data_load_thread, generate_batch) + data_load_thread = BatchData(data_load_thread, self.cfg.batch_size) + + data_load_thread.reset_state() + dataiter = data_load_thread.get_data() + + if int(rank_size) > 1: + itr_per_epoch = math.ceil(len(train_data)/self.cfg.batch_size/self.cfg.num_gpus/rank_size) + else: + itr_per_epoch = math.ceil(len(train_data)/self.cfg.batch_size/self.cfg.num_gpus) + + return dataiter, itr_per_epoch + + def _make_graph(self): + self.logger.info("Generating training graph on {} GPUs ...".format(self.cfg.num_gpus)) + + weights_initializer = slim.xavier_initializer() + biases_initializer = tf.constant_initializer(0.) + biases_regularizer = tf.no_regularizer + weights_regularizer = tf.contrib.layers.l2_regularizer(self.cfg.weight_decay) + + tower_grads = [] + with tf.variable_scope(tf.get_variable_scope()): + for i in range(self.cfg.num_gpus): + with tf.device('/cpu:0'): + with tf.name_scope('tower_%d' % i) as name_scope: + # Force all Variables to reside on the CPU. + with slim.arg_scope([slim.model_variable, slim.variable], device='/device:CPU:0'): + with slim.arg_scope([slim.conv2d, slim.conv2d_in_plane, \ + slim.conv2d_transpose, slim.separable_conv2d, + slim.fully_connected], + weights_regularizer=weights_regularizer, + biases_regularizer=biases_regularizer, + weights_initializer=weights_initializer, + biases_initializer=biases_initializer): + # loss over single GPU + self.net.make_network(is_train=True) + if i == self.cfg.num_gpus - 1: + loss = self.net.get_loss(include_wd=True) + else: + loss = self.net.get_loss() + self._input_list.append( self.net.get_inputs() ) + + tf.get_variable_scope().reuse_variables() + + if i == 0: + if self.cfg.num_gpus > 1 and self.cfg.bn_train is True: + self.logger.warning("BN is calculated only on single GPU.") + extra_update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, name_scope) + with tf.control_dependencies(extra_update_ops): + grads = self._optimizer.compute_gradients(loss) + else: + grads = self._optimizer.compute_gradients(loss) + final_grads = [] + with tf.variable_scope('Gradient_Mult') as scope: + for grad, var in grads: + final_grads.append((grad, var)) + tower_grads.append(final_grads) + + if len(tower_grads) > 1: + grads = average_gradients(tower_grads) + else: + grads = tower_grads[0] + + apply_gradient_op = self._optimizer.apply_gradients(grads) + train_op = tf.group(apply_gradient_op, *extra_update_ops) + + return train_op + + def train(self): + + # saver + self.logger.info('Initialize saver ...') + train_saver = Saver(self.sess, tf.global_variables(), self.cfg.model_dump_dir) + + # initialize weights + self.logger.info('Initialize all variables ...') + self.sess.run(tf.variables_initializer(tf.global_variables(), name='init')) + self.load_weights('last_epoch' if self.cfg.continue_train else self.cfg.init_model) + + rank_size = int(os.getenv('RANK_SIZE')) + if int(rank_size) > 1: + input = tf.trainable_variables() + bcast_global_variables_op = hccl_ops.broadcast(input, 0) + self.sess.run(bcast_global_variables_op) + + self.logger.info('Start training ...') + start_itr = self.cur_epoch * self.itr_per_epoch + 1 + end_itr = self.itr_per_epoch * self.cfg.end_epoch + 1 + for itr in range(start_itr, end_itr): + self.tot_timer.tic() + + self.cur_epoch = itr // self.itr_per_epoch + setproctitle.setproctitle('train epoch:' + str(self.cur_epoch)) + + # apply current learning policy + cur_lr = self.cfg.get_lr(self.cur_epoch) + if not approx_equal(cur_lr, self.lr_eval): + print(self.lr_eval, cur_lr) + self.sess.run(tf.assign(self.lr, cur_lr)) + + # input data + self.read_timer.tic() + feed_dict = self.next_feed() + self.read_timer.toc() + + # train one step + self.gpu_timer.tic() + _, self.lr_eval, *summary_res = self.sess.run( + [self.graph_ops[0], self.lr, *self.summary_dict.values()], feed_dict=feed_dict) + self.gpu_timer.toc() + + itr_summary = dict() + for i, k in enumerate(self.summary_dict.keys()): + itr_summary[k] = summary_res[i] + + screen = [ + 'Epoch %d itr %d/%d:' % (self.cur_epoch, itr, self.itr_per_epoch), + 'lr: %g' % (self.lr_eval), + 'speed: %.2f(%.2fs r%.2f)s/itr' % ( + self.tot_timer.average_time, self.gpu_timer.average_time, self.read_timer.average_time), + '%.2fh/epoch' % (self.tot_timer.average_time / 3600. * self.itr_per_epoch), + ' '.join(map(lambda x: '%s: %.4f' % (x[0], x[1]), itr_summary.items())), + ] + + + #TODO(display stall?) + if itr % self.cfg.display == 0: + self.logger.info(' '.join(screen)) + + if itr % self.itr_per_epoch == 0: + train_saver.save_model(self.cur_epoch) + + self.tot_timer.toc() + +class Tester(Base): + def __init__(self, net, cfg, data_iter=None): + super(Tester, self).__init__(net, cfg, data_iter, log_name='test_logs.txt') + + def next_feed(self, batch_data=None): + if self._data_iter is None and batch_data is None: + raise ValueError('No input data.') + feed_dict = dict() + if batch_data is None: + for inputs in self._input_list: + blobs = next(self._data_iter) + for i, inp in enumerate(inputs): + inp_shape = inp.get_shape().as_list() + if None in inp_shape: + feed_dict[inp] = blobs[i] + else: + feed_dict[inp] = blobs[i].reshape(*inp_shape) + else: + assert isinstance(batch_data, list) or isinstance(batch_data, tuple), "Input data should be list-type." + assert len(batch_data) == len(self._input_list[0]), "Input data is incomplete." + + batch_size = self.cfg.batch_size + if self._input_list[0][0].get_shape().as_list()[0] is None: + # fill batch + for i in range(len(batch_data)): + batch_size = (len(batch_data[i]) + self.cfg.num_gpus - 1) // self.cfg.num_gpus + total_batches = batch_size * self.cfg.num_gpus + left_batches = total_batches - len(batch_data[i]) + if left_batches > 0: + batch_data[i] = np.append(batch_data[i], np.zeros((left_batches, *batch_data[i].shape[1:])), axis=0) + self.logger.warning("Fill some blanks to fit batch_size which wastes %d%% computation" % ( + left_batches * 100. / total_batches)) + else: + assert self.cfg.batch_size * self.cfg.num_gpus == len(batch_data[0]), \ + "Input batch doesn't fit placeholder batch." + + for j, inputs in enumerate(self._input_list): + for i, inp in enumerate(inputs): + feed_dict[ inp ] = batch_data[i][j * batch_size: (j+1) * batch_size] + + #@TODO(delete) + assert (j+1) * batch_size == len(batch_data[0]), 'check batch' + return feed_dict, batch_size + + def _make_graph(self): + self.logger.info("Generating testing graph on {} GPUs ...".format(self.cfg.num_gpus)) + + with tf.variable_scope(tf.get_variable_scope()): + for i in range(self.cfg.num_gpus): + with tf.device('/cpu:0'): + with tf.name_scope('tower_%d' % i) as name_scope: + with slim.arg_scope([slim.model_variable, slim.variable], device='/device:CPU:0'): + self.net.make_network(is_train=False) + self._input_list.append(self.net.get_inputs()) + self._output_list.append(self.net.get_outputs()) + + tf.get_variable_scope().reuse_variables() + + self._outputs = aggregate_batch(self._output_list) + + # run_meta = tf.RunMetadata() + # opts = tf.profiler.ProfileOptionBuilder.float_operation() + # flops = tf.profiler.profile(self.sess.graph, run_meta=run_meta, cmd='op', options=opts) + # + # opts = tf.profiler.ProfileOptionBuilder.trainable_variables_parameter() + # params = tf.profiler.profile(self.sess.graph, run_meta=run_meta, cmd='op', options=opts) + + # print("{:,} --- {:,}".format(flops.total_float_ops, params.total_parameters)) + # from IPython import embed; embed() + + return self._outputs + + def predict_one(self, data=None): + # TODO(reduce data in limited batch) + assert len(self.summary_dict) == 0, "still not support scalar summary in testing stage" + setproctitle.setproctitle('test epoch:' + str(self.cur_epoch)) + + self.read_timer.tic() + feed_dict, batch_size = self.next_feed(data) + self.read_timer.toc() + + self.gpu_timer.tic() + res = self.sess.run([*self.graph_ops, *self.summary_dict.values()], feed_dict=feed_dict) + self.gpu_timer.toc() + + if data is not None and len(data[0]) < self.cfg.num_gpus * batch_size: + for i in range(len(res)): + res[i] = res[i][:len(data[0])] + + return res + + def test(self): + pass + + -- Gitee From 174b0a79ba329f661766780dfb391be3bb3ec630 Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Tue, 18 Oct 2022 02:20:05 +0000 Subject: [PATCH 09/10] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=96=87=E4=BB=B6=20Te?= =?UTF-8?q?nsorFlow/contrib/cv/SimpleHumanPose=5FID0956=5Ffor=5FTensorFlow?= =?UTF-8?q?/src/lib/tfflat/data=5Fprovider.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/lib/tfflat/data_provider.py | 402 ------------------ 1 file changed, 402 deletions(-) delete mode 100644 TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/data_provider.py diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/data_provider.py b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/data_provider.py deleted file mode 100644 index db62e5fd5..000000000 --- a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/data_provider.py +++ /dev/null @@ -1,402 +0,0 @@ -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ -# Copyright 2021 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# copy from tensorpack: https://github.com/ppwwyyxx/tensorpack -from npu_bridge.npu_init import * -import numpy as np -import threading -import multiprocessing as mp -import weakref -from contextlib import contextmanager -from .serialize import loads, dumps -import errno -import uuid -import os -import zmq -import atexit -from itertools import cycle -from copy import copy -from .utils import get_rng -from setproctitle import setproctitle - -def del_weakref(x): - o = x() - if o is not None: - o.__del__() - -@contextmanager -def _zmq_catch_error(name): - try: - yield - except zmq.ContextTerminated: - print("[{}] Context terminated.".format(name)) - raise Exception - except zmq.ZMQError as e: - if e.errno == errno.ENOTSOCK: # socket closed - print("[{}] Socket closed.".format(name)) - raise Exception - else: - raise - except Exception: - raise - -class DataFlowReentrantGuard(object): - """ - A tool to enforce non-reentrancy. - Mostly used on DataFlow whose :meth:`get_data` is stateful, - so that multiple instances of the iterator cannot co-exist. - """ - def __init__(self): - self._lock = threading.Lock() - - def __enter__(self): - self._succ = self._lock.acquire(False) - if not self._succ: - raise threading.ThreadError("This DataFlow is not reentrant!") - - def __exit__(self, exc_type, exc_val, exc_tb): - self._lock.release() - return False - -class DataFromList(object): - def __init__(self, datalist, is_train=True, shuffle=True): - self.rng = get_rng() - self._datalist = datalist - self._shuffle = shuffle - self._is_train = is_train - - def get_data(self): - if self._is_train: - while True: - idxs = np.arange(len(self._datalist)) - if self._shuffle: - self.rng.shuffle(idxs) - for i in idxs: - yield self._datalist[i] - else: - idxs = np.arange(len(self._datalist)) - if self._shuffle: - self.rng.shuffle(idxs) - for i in idxs: - yield self._datalist[i] - - def reset_state(self): - self.rng = get_rng() - -class _ParallelMapData(object): - def __init__(self, ds, buffer_size): - assert buffer_size > 0, buffer_size - self._buffer_size = buffer_size - self._buffer_occupancy = 0 # actual #elements in buffer - - self.ds = ds - - def _recv(self): - pass - - def _send(self, dp): - pass - - def _recv_filter_none(self): - ret = self._recv() - assert ret is not None, \ - "[{}] Map function cannot return None when strict mode is used.".format(type(self).__name__) - return ret - - def _fill_buffer(self, cnt=None): - if cnt is None: - cnt = self._buffer_size - self._buffer_occupancy - try: - for _ in range(cnt): - dp = next(self._iter) - self._send(dp) - except StopIteration: - print( - "[{}] buffer_size cannot be larger than the size of the DataFlow!".format(type(self).__name__)) - raise - self._buffer_occupancy += cnt - - def get_data_non_strict(self): - for dp in self._iter: - self._send(dp) - ret = self._recv() - if ret is not None: - yield ret - - self._iter = self.ds.get_data() # refresh - for _ in range(self._buffer_size): - self._send(next(self._iter)) - ret = self._recv() - if ret is not None: - yield ret - - def get_data_strict(self): - self._fill_buffer() - for dp in self._iter: - self._send(dp) - yield self._recv_filter_none() - self._iter = self.ds.get_data() # refresh - - # first clear the buffer, then fill - for k in range(self._buffer_size): - dp = self._recv_filter_none() - self._buffer_occupancy -= 1 - if k == self._buffer_size - 1: - self._fill_buffer() - yield dp - -class MapData(object): - """ - Apply a mapper/filter on the DataFlow. - - Note: - 1. Please make sure func doesn't modify the components - unless you're certain it's safe. - 2. If you discard some datapoints, ``ds.size()`` will be incorrect. - """ - - def __init__(self, ds, func): - """ - Args: - ds (DataFlow): input DataFlow - func (datapoint -> datapoint | None): takes a datapoint and returns a new - datapoint. Return None to discard this datapoint. - """ - self.ds = ds - self.func = func - - def get_data(self): - for dp in self.ds.get_data(): - ret = self.func(copy(dp)) # shallow copy the list - if ret is not None: - yield ret - - def reset_state(self): - pass - -class MultiProcessMapDataZMQ(_ParallelMapData): - """ - Same as :class:`MapData`, but start processes to run the mapping function, - and communicate with ZeroMQ pipe. - - Note: - 1. Processes run in parallel and can take different time to run the - mapping function. Therefore the order of datapoints won't be - preserved, and datapoints from one pass of `df.get_data()` might get - mixed with datapoints from the next pass. - - You can use **strict mode**, where `MultiProcessMapData.get_data()` - is guranteed to produce the exact set which `df.get_data()` - produces. Although the order of data still isn't preserved. - """ - class _Worker(mp.Process): - def __init__(self, identity, map_func, pipename, hwm): - super(MultiProcessMapDataZMQ._Worker, self).__init__() - self.identity = identity - self.map_func = map_func - self.pipename = pipename - self.hwm = hwm - - def run(self): - print('Start data provider {}-{}'.format(self.pipename, self.identity)) - setproctitle('data provider {}-{}'.format(self.pipename, self.identity)) - ctx = zmq.Context() - socket = ctx.socket(zmq.DEALER) - socket.setsockopt(zmq.IDENTITY, self.identity) - socket.set_hwm(self.hwm) - socket.connect(self.pipename) - - while True: - dp = loads(socket.recv(copy=False).bytes) - dp = self.map_func(dp) - socket.send(dumps(dp), copy=False) - - def __init__(self, ds, nr_proc, map_func, buffer_size=200, strict=False): - """ - Args: - ds (DataFlow): the dataflow to map - nr_proc(int): number of threads to use - map_func (callable): datapoint -> datapoint | None - buffer_size (int): number of datapoints in the buffer - strict (bool): use "strict mode", see notes above. - """ - _ParallelMapData.__init__(self, ds, buffer_size) - self.nr_proc = nr_proc - self.map_func = map_func - self._strict = strict - self._procs = [] - self._guard = DataFlowReentrantGuard() - - self._reset_done = False - self._procs = [] - - def _reset_once(self): - self.context = zmq.Context() - self.socket = self.context.socket(zmq.ROUTER) - self.socket.set_hwm(self._buffer_size * 2) - pipename = "ipc://@{}-pipe-{}".format('dataflow-map', str(uuid.uuid1())[:8]) - - try: - self.socket.bind(pipename) - except zmq.ZMQError: - print( - "ZMQError in socket.bind(). Perhaps you're \ - using pipes on a non-local file system. See documentation of PrefetchDataZMQ for more information.") - raise - - self._proc_ids = [u'{}'.format(k).encode('utf-8') for k in range(self.nr_proc)] - worker_hwm = int(self._buffer_size * 2 // self.nr_proc) - self._procs = [MultiProcessMapDataZMQ._Worker( - self._proc_ids[k], self.map_func, pipename, worker_hwm) - for k in range(self.nr_proc)] - - self.ds.reset_state() - self._iter = self.ds.get_data() - self._iter_worker = cycle(iter(self._proc_ids)) - - for p in self._procs: - p.deamon = True - p.start() - self._fill_buffer() # pre-fill the bufer - - def reset_state(self): - if self._reset_done: - return - self._reset_done = True - - # __del__ not guranteed to get called at exit - atexit.register(del_weakref, weakref.ref(self)) - - self._reset_once() # build processes - - def _send(self, dp): - # round-robin assignment - worker = next(self._iter_worker) - msg = [worker, dumps(dp)] - self.socket.send_multipart(msg, copy=False) - - def _recv(self): - msg = self.socket.recv_multipart(copy=False) - dp = loads(msg[1].bytes) - return dp - - def get_data(self): - with self._guard, _zmq_catch_error('MultiProcessMapData'): - if self._strict: - for dp in self.get_data_strict(): - yield dp - else: - for dp in self.get_data_non_strict(): - yield dp - - def __del__(self): - try: - if not self._reset_done: - return - if not self.context.closed: - self.socket.close(0) - self.context.destroy(0) - for x in self._procs: - x.terminate() - x.join(5) - print("{} successfully cleaned-up.".format(type(self).__name__)) - except Exception: - pass - -class BatchData(object): - """ - Stack datapoints into batches. - It produces datapoints of the same number of components as ``ds``, but - each component has one new extra dimension of size ``batch_size``. - The batch can be either a list of original components, or (by default) - a numpy array of original components. - """ - - def __init__(self, ds, batch_size, use_list=False): - """ - Args: - ds (DataFlow): When ``use_list=False``, the components of ``ds`` - must be either scalars or :class:`np.ndarray`, and have to be consistent in shapes. - batch_size(int): batch size - use_list (bool): if True, each component will contain a list - of datapoints instead of an numpy array of an extra dimension. - """ - self.ds = ds - self.batch_size = int(batch_size) - self.use_list = use_list - - def get_data(self): - """ - Yields: - Batched data by stacking each component on an extra 0th dimension. - """ - holder = [] - for data in self.ds.get_data(): - holder.append(data) - if len(holder) == self.batch_size: - yield BatchData._aggregate_batch(holder, self.use_list) - del holder[:] - - @staticmethod - def _aggregate_batch(data_holder, use_list=False): - size = len(data_holder[0]) - result = [] - for k in range(size): - if use_list: - result.append( - [x[k] for x in data_holder]) - else: - dt = data_holder[0][k] - if type(dt) in [int, bool]: - tp = 'int32' - elif type(dt) == float: - tp = 'float32' - else: - try: - tp = dt.dtype - except AttributeError: - raise TypeError("Unsupported type to batch: {}".format(type(dt))) - try: - result.append( - np.asarray([x[k] for x in data_holder], dtype=tp)) - except Exception as e: # noqa - logger.exception("Cannot batch data. Perhaps they are of inconsistent shape?") - if isinstance(dt, np.ndarray): - s = pprint.pformat([x[k].shape for x in data_holder]) - logger.error("Shape of all arrays to be batched: " + s) - try: - # open an ipython shell if possible - import IPython as IP; IP.embed() # noqa - except ImportError: - pass - return result - - def reset_state(self): - self.ds.reset_state() - - -- Gitee From a41a392d92827490ff682be8820c731a4a0fb396 Mon Sep 17 00:00:00 2001 From: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> Date: Tue, 18 Oct 2022 02:20:22 +0000 Subject: [PATCH 10/10] update Signed-off-by: LeileiZheng <11268670+leileizheng@user.noreply.gitee.com> --- .../src/lib/tfflat/data_provider.py | 414 ++++++++++++++++++ 1 file changed, 414 insertions(+) create mode 100644 TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/data_provider.py diff --git a/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/data_provider.py b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/data_provider.py new file mode 100644 index 000000000..bf21411e9 --- /dev/null +++ b/TensorFlow/contrib/cv/SimpleHumanPose_ID0956_for_TensorFlow/src/lib/tfflat/data_provider.py @@ -0,0 +1,414 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# copy from tensorpack: https://github.com/ppwwyyxx/tensorpack +from npu_bridge.npu_init import * +import numpy as np +import threading +import multiprocessing as mp +import weakref +from contextlib import contextmanager +from .serialize import loads, dumps +import errno +import uuid +import os +import zmq +import atexit +from itertools import cycle +from copy import copy +from .utils import get_rng +from setproctitle import setproctitle + +def del_weakref(x): + o = x() + if o is not None: + o.__del__() + +@contextmanager +def _zmq_catch_error(name): + try: + yield + except zmq.ContextTerminated: + print("[{}] Context terminated.".format(name)) + raise Exception + except zmq.ZMQError as e: + if e.errno == errno.ENOTSOCK: # socket closed + print("[{}] Socket closed.".format(name)) + raise Exception + else: + raise + except Exception: + raise + +class DataFlowReentrantGuard(object): + """ + A tool to enforce non-reentrancy. + Mostly used on DataFlow whose :meth:`get_data` is stateful, + so that multiple instances of the iterator cannot co-exist. + """ + def __init__(self): + self._lock = threading.Lock() + + def __enter__(self): + self._succ = self._lock.acquire(False) + if not self._succ: + raise threading.ThreadError("This DataFlow is not reentrant!") + + def __exit__(self, exc_type, exc_val, exc_tb): + self._lock.release() + return False + +class DataFromList(object): + def __init__(self, datalist, is_train=True, shuffle=True): + self.rng = get_rng() + self._datalist = datalist + self._shuffle = shuffle + self._is_train = is_train + + def get_data(self): + if self._is_train: + while True: + idxs = np.arange(len(self._datalist)) + if self._shuffle: + self.rng.shuffle(idxs) + for i in idxs: + yield self._datalist[i] + else: + idxs = np.arange(len(self._datalist)) + if self._shuffle: + self.rng.shuffle(idxs) + for i in idxs: + yield self._datalist[i] + + def reset_state(self): + self.rng = get_rng() + +class _ParallelMapData(object): + def __init__(self, ds, buffer_size): + assert buffer_size > 0, buffer_size + self._buffer_size = buffer_size + self._buffer_occupancy = 0 # actual #elements in buffer + + self.ds = ds + + def _recv(self): + pass + + def _send(self, dp): + pass + + def _recv_filter_none(self): + ret = self._recv() + assert ret is not None, \ + "[{}] Map function cannot return None when strict mode is used.".format(type(self).__name__) + return ret + + def _fill_buffer(self, cnt=None): + if cnt is None: + cnt = self._buffer_size - self._buffer_occupancy + try: + for _ in range(cnt): + dp = next(self._iter) + self._send(dp) + except StopIteration: + print( + "[{}] buffer_size cannot be larger than the size of the DataFlow!".format(type(self).__name__)) + raise + self._buffer_occupancy += cnt + + def get_data_non_strict(self): + for dp in self._iter: + self._send(dp) + ret = self._recv() + if ret is not None: + yield ret + + self._iter = self.ds.get_data() # refresh + for _ in range(self._buffer_size): + self._send(next(self._iter)) + ret = self._recv() + if ret is not None: + yield ret + + def get_data_strict(self): + self._fill_buffer() + for dp in self._iter: + self._send(dp) + yield self._recv_filter_none() + self._iter = self.ds.get_data() # refresh + + # first clear the buffer, then fill + for k in range(self._buffer_size): + dp = self._recv_filter_none() + self._buffer_occupancy -= 1 + if k == self._buffer_size - 1: + self._fill_buffer() + yield dp + +class MapData(object): + """ + Apply a mapper/filter on the DataFlow. + + Note: + 1. Please make sure func doesn't modify the components + unless you're certain it's safe. + 2. If you discard some datapoints, ``ds.size()`` will be incorrect. + """ + + def __init__(self, ds, func): + """ + Args: + ds (DataFlow): input DataFlow + func (datapoint -> datapoint | None): takes a datapoint and returns a new + datapoint. Return None to discard this datapoint. + """ + self.ds = ds + self.func = func + + def get_data(self): + for dp in self.ds.get_data(): + ret = self.func(copy(dp)) # shallow copy the list + if ret is not None: + yield ret + + def reset_state(self): + pass + +class MultiProcessMapDataZMQ(_ParallelMapData): + """ + Same as :class:`MapData`, but start processes to run the mapping function, + and communicate with ZeroMQ pipe. + + Note: + 1. Processes run in parallel and can take different time to run the + mapping function. Therefore the order of datapoints won't be + preserved, and datapoints from one pass of `df.get_data()` might get + mixed with datapoints from the next pass. + + You can use **strict mode**, where `MultiProcessMapData.get_data()` + is guranteed to produce the exact set which `df.get_data()` + produces. Although the order of data still isn't preserved. + """ + class _Worker(mp.Process): + def __init__(self, identity, map_func, pipename, hwm): + super(MultiProcessMapDataZMQ._Worker, self).__init__() + self.identity = identity + self.map_func = map_func + self.pipename = pipename + self.hwm = hwm + + def run(self): + print('Start data provider {}-{}'.format(self.pipename, self.identity)) + setproctitle('data provider {}-{}'.format(self.pipename, self.identity)) + ctx = zmq.Context() + socket = ctx.socket(zmq.DEALER) + socket.setsockopt(zmq.IDENTITY, self.identity) + socket.set_hwm(self.hwm) + socket.connect(self.pipename) + + while True: + dp = loads(socket.recv(copy=False).bytes) + dp = self.map_func(dp) + socket.send(dumps(dp), copy=False) + + def __init__(self, ds, nr_proc, map_func, buffer_size=200, strict=False): + """ + Args: + ds (DataFlow): the dataflow to map + nr_proc(int): number of threads to use + map_func (callable): datapoint -> datapoint | None + buffer_size (int): number of datapoints in the buffer + strict (bool): use "strict mode", see notes above. + """ + _ParallelMapData.__init__(self, ds, buffer_size) + self.nr_proc = nr_proc + self.map_func = map_func + self._strict = strict + self._procs = [] + self._guard = DataFlowReentrantGuard() + + self._reset_done = False + self._procs = [] + + def _reset_once(self): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.ROUTER) + self.socket.set_hwm(self._buffer_size * 2) + pipename = "ipc://@{}-pipe-{}".format('dataflow-map', str(uuid.uuid1())[:8]) + + try: + self.socket.bind(pipename) + except zmq.ZMQError: + print( + "ZMQError in socket.bind(). Perhaps you're \ + using pipes on a non-local file system. See documentation of PrefetchDataZMQ for more information.") + raise + + self._proc_ids = [u'{}'.format(k).encode('utf-8') for k in range(self.nr_proc)] + worker_hwm = int(self._buffer_size * 2 // self.nr_proc) + self._procs = [MultiProcessMapDataZMQ._Worker( + self._proc_ids[k], self.map_func, pipename, worker_hwm) + for k in range(self.nr_proc)] + + self.ds.reset_state() + self._iter = self.ds.get_data() + self._iter_worker = cycle(iter(self._proc_ids)) + + for p in self._procs: + p.deamon = True + p.start() + self._fill_buffer() # pre-fill the bufer + + def reset_state(self): + if self._reset_done: + return + self._reset_done = True + + # __del__ not guranteed to get called at exit + atexit.register(del_weakref, weakref.ref(self)) + + self._reset_once() # build processes + + def _send(self, dp): + # round-robin assignment + worker = next(self._iter_worker) + msg = [worker, dumps(dp)] + self.socket.send_multipart(msg, copy=False) + + def _recv(self): + msg = self.socket.recv_multipart(copy=False) + dp = loads(msg[1].bytes) + return dp + + def get_data(self): + with self._guard, _zmq_catch_error('MultiProcessMapData'): + if self._strict: + for dp in self.get_data_strict(): + yield dp + else: + for dp in self.get_data_non_strict(): + yield dp + + def __del__(self): + try: + if not self._reset_done: + return + if not self.context.closed: + self.socket.close(0) + self.context.destroy(0) + for x in self._procs: + x.terminate() + x.join(5) + print("{} successfully cleaned-up.".format(type(self).__name__)) + except Exception: + pass + +class BatchData(object): + """ + Stack datapoints into batches. + It produces datapoints of the same number of components as ``ds``, but + each component has one new extra dimension of size ``batch_size``. + The batch can be either a list of original components, or (by default) + a numpy array of original components. + """ + + def __init__(self, ds, batch_size, use_list=False): + """ + Args: + ds (DataFlow): When ``use_list=False``, the components of ``ds`` + must be either scalars or :class:`np.ndarray`, and have to be consistent in shapes. + batch_size(int): batch size + use_list (bool): if True, each component will contain a list + of datapoints instead of an numpy array of an extra dimension. + """ + self.ds = ds + self.batch_size = int(batch_size) + self.use_list = use_list + + def get_data(self): + """ + Yields: + Batched data by stacking each component on an extra 0th dimension. + """ + # shard by rankid + rank_size = int(os.getenv('RANK_SIZE')) + if int(rank_size) > 1: + rank_id = int(os.getenv('RANK_ID')) + else : + rank_id = 0 + + i = 0 + holder = [] + for data in self.ds.get_data(): + # holder.append(data) + if (i // self.batch_size) == rank_id: + holder.append(data) + i += 1 + if len(holder) == self.batch_size: + i = 0 + yield BatchData._aggregate_batch(holder, self.use_list) + del holder[:] + + @staticmethod + def _aggregate_batch(data_holder, use_list=False): + size = len(data_holder[0]) + result = [] + for k in range(size): + if use_list: + result.append( + [x[k] for x in data_holder]) + else: + dt = data_holder[0][k] + if type(dt) in [int, bool]: + tp = 'int32' + elif type(dt) == float: + tp = 'float32' + else: + try: + tp = dt.dtype + except AttributeError: + raise TypeError("Unsupported type to batch: {}".format(type(dt))) + try: + result.append( + np.asarray([x[k] for x in data_holder], dtype=tp)) + except Exception as e: # noqa + logger.exception("Cannot batch data. Perhaps they are of inconsistent shape?") + if isinstance(dt, np.ndarray): + s = pprint.pformat([x[k].shape for x in data_holder]) + logger.error("Shape of all arrays to be batched: " + s) + try: + # open an ipython shell if possible + import IPython as IP; IP.embed() # noqa + except ImportError: + pass + return result + + def reset_state(self): + self.ds.reset_state() + + -- Gitee