From 168eec22260495833785716e1f5255b9bfdd0346 Mon Sep 17 00:00:00 2001 From: yongchao1 <297389370@qq.com> Date: Tue, 23 May 2023 13:21:49 +0000 Subject: [PATCH] =?UTF-8?q?update=20TensorFlow/built-in/recommendation/DIN?= =?UTF-8?q?=5FID0190=5Ffor=5FTensorFlow/examples/din=5Fdemo.py.=20?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=89=93=E7=82=B9=E4=BB=A3=E7=A0=81=EF=BC=8C?= =?UTF-8?q?=E5=B7=B2=E8=87=AA=E9=AA=8C=EF=BC=8C=E6=80=A7=E8=83=BD=E7=B2=BE?= =?UTF-8?q?=E5=BA=A6ok?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: yongchao1 <297389370@qq.com> --- .../examples/din_demo.py | 107 +++++++++++++++++- 1 file changed, 105 insertions(+), 2 deletions(-) diff --git a/TensorFlow/built-in/recommendation/DIN_ID0190_for_TensorFlow/examples/din_demo.py b/TensorFlow/built-in/recommendation/DIN_ID0190_for_TensorFlow/examples/din_demo.py index 4af71c2bb..f85958820 100644 --- a/TensorFlow/built-in/recommendation/DIN_ID0190_for_TensorFlow/examples/din_demo.py +++ b/TensorFlow/built-in/recommendation/DIN_ID0190_for_TensorFlow/examples/din_demo.py @@ -3,6 +3,8 @@ import tensorflow as tf from deepctr.feature_column import SparseFeat, VarLenSparseFeat from deepctr.models import DIN import numpy as np +import time +batch_size_global = 1024 import tensorflow.python.keras as keras rank_size = int(os.getenv('RANK_SIZE',"1")) rank_id = int(os.getenv('RANK_ID')) @@ -49,6 +51,90 @@ def input_fn(filenames, is_train, batch_size=1024): dataset = dataset.prefetch(buffer_size=tf.contrib.data.AUTOTUNE) return dataset +class TimeHistory(keras.callbacks.Callback): + def on_train_begin(self, log={}): + self.init_time = time.time() + self.batch_train_time = [] + self.batch_valid_time = [] + self.batch_train_fps = [] + self.batch_e2e_fps = [] + self.epoch_train_time_accum = 0 + self.epoch_valid_time_accum = 0 + self.epoch_train_samples_accum = 0 + self.hist_tr_samples = 0 + + self.nn_e2e = 0 + + self.times = { + "epoch_tr_time": [], + "epoch_va_time": [], + "epoch_total_time": [], + "hist_tr_time": [], + "hist_va_time": [], + "hist_total_time": [], + "epoch_tr_fps": [], + "hist_tr_fps": [], + "epoch_total_fps": [], + "hist_total_fps": [], + "epoch_max_fps" : [], + "epoch_timestamp": [] + } + + def on_train_end(self,logs={}): + self.nn_e2e = time.time() - self.init_time + self.batch_train_time = self.batch_train_time[1:] + self.batch_valid_time = self.batch_valid_time[1:] + + def on_train_batch_begin(self, batch, logs={}): + self.train_batch_start = time.time() + + def on_train_batch_end(self, batch, logs={}): + batch_time = time.time() - self.train_batch_start + self.batch_train_time.append(batch_time) + self.batch_train_fps.append(batch_size_global / batch_time) + self.epoch_train_samples_accum += batch_size_global + self.hist_tr_samples += batch_size_global + + def on_test_batch_brgin(self, batch,logs={}): + self.eval_batch_start = time.time() + + def on_test_batch_end(self, batch,logs={}): + batch_time = time.time() - self.eval_batch_start + self.batch_valid_time.append(batch_time) + + def on_epoch_begin(self, epoch, logs={}): + self.epoch_time_start = time.time() + self.batch_train_time = [] + self.batch_valid_time = [] + self.epoch_train_samples_accum = 0 + self.batch_train_fps = [] + + def on_epoch_end(self, epoch, logs={}): + end_timestamp = time.time() + self.times["epoch_timestamp"].append(end_timestamp - self.init_time) + epoch_time = end_timestamp - self.epoch_time_start + + self.times["epoch_total_time"].append(epoch_time) + try: + self.times["hist_total_time"].append(epoch_time + self.times["hist_total_time"][-1]) + except: + self.times["hist_total_time"].append(epoch_time) + self.times["epoch_tr_time"].append(np.sum(self.batch_train_time)) + try: + self.times["hist_tr_time"].append(np.sum(self.batch_train_time) + self.times["hist_tr_time"][-1]) + except: + self.times["hist_tr_time"].append(np.sum(self.batch_train_time)) + self.times["epoch_va_time"].append(np.sum(self.batch_valid_time)) + try: + self.times["hist_va_time"].append(np.sum(self.batch_valid_time) + self.times["hist_va_time"][-1]) + except: + self.times["hist_va_time"].append(np.sum(self.batch_valid_time)) + + self.times["hist_tr_fps"].append(self.hist_tr_samples / self.times["hist_tr_time"][-1]) + self.tiems["epoch_tr_fps"].append(self.epoch_train_samples_accum / self.times["epoch_tr_time"][-1]) + self.times["epoch_total_fps"].append(self.epoch_train_samples_accum / epoch_time) + self.times["hist_total_fps"].append(self.hist_tr_samples / self.times["hist_total_time"][-1]) + self.times["epoch_max_fps"].append(np.max(self.batch_train_fps)) if __name__ == "__main__": config_proto = tf.ConfigProto() @@ -59,6 +145,7 @@ if __name__ == "__main__": custom_op.parameter_map["hcom_parallel"].b = True #custom_op.parameter_map["enable_data_pre_proc"].b = True npu_keras_sess = set_keras_session_npu_config(config=config_proto) + process_init_time = time.time() # shard for 8p filename = split_tfrecord(r"./data/train.tfrecords.gz") @@ -76,9 +163,25 @@ if __name__ == "__main__": opt = tf.keras.optimizers.Adam(learning_rate=1e-3 * rank_size) opt = npu_distributed_optimizer_wrapper(opt) model.compile(opt, 'binary_crossentropy', metrics=['binary_crossentropy', "AUC"]) - - callbacks = [NPUBroadcastGlobalVariablesCallback(0)] + time_callback = TimeHistory() + callbacks = [NPUBroadcastGlobalVariablesCallback(0), time_callback] model.fit(x=input_fn(filename, True), epochs=5, verbose=1, validation_data=input_fn(r"./data/test.tfrecords.gz", False), validation_steps=5406, callbacks=callbacks) + proc_total_time = time.time() - process_init_time + timing_items = sorted(time_callback.time.keys()) + epochs = len(time_callback.times[timing_items[0]]) + + print("Epoch, ", end="") + for k in timing_items: + print(f"{k} ", end="") + print("E2E total time") + + for i in range(epochs): + print(f"{i}, ", end="") + + for k in timing_items: + val = time_callback.times[k][i] + print(f"{val:.4f}, ", end="") + print(f"{proc_total_time:4f}") \ No newline at end of file -- Gitee