训练迭代循环下沉
概述
iterations_per_loop是针对一次session.run调用,在Device侧执行训练迭代的次数。Device侧会运行iterations_per_loop指定的迭代次数,然后再返回到Host侧,该参数可以减少Host与Device间的交互次数,缩短训练时长。
本节内容介绍通过迁移工具迁移后,如何使能训练迭代下沉。
使用约束
iterations_per_loop默认为1,配置该参数大于1即可使能此特性,使用该特性时需要注意:
- 要求训练脚本必须使用TF Dataset方式读数据,并且不使用one shot iterator进行预处理初始化。例如使用tf.data.make_initializable_iterator()迭代器。
- 要求enable_data_pre_proc开关配置为1,此时才会生成在Device侧执行的getnext算子,训练迭代循环下沉才会生效。
- 要求训练迭代总次数必须为iterations_per_loop的整数倍。
- 训练迭代循环下沉场景下保存checkpoint数据时,要求save_checkpoints_steps必须大于或等于iterations_per_loop,且是iterations_per_loop的整数倍,否则不会按照save_checkpoints_steps配置的值保存数据。且iterations_per_loop>1时,可能无法按照save_summary_steps和log_step_count_steps配置的值保存信息,请参考Log/Summary实现信息保存。
- 混合计算模式(mix_compile_mode为True)下,不能开启训练迭代循环下沉,即要求iterations_per_loop必须为1。
- 在网络调测阶段建议设置iterations_per_loop为1,方便打印每轮迭代的日志信息。网络调通后可以设置iterations_per_loop参数用于缩短训练时长。
Estimator模式修改
- 检查迁移后的脚本是否存在“init_resource”。
- 如果存在,则需要参考下面示例进行修改;修改完后,执行下一步。
- 如果不存在,则直接执行下一步。
if __name__ == '__main__': session_config = tf.ConfigProto() custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件 custom_op.parameter_map["iterations_per_loop"].i = 10 (npu_sess, npu_shutdown) = init_resource(config=session_config) tf.app.run() shutdown_resource(npu_sess, npu_shutdown) close_session(npu_sess)
- 在脚本中找到“npu_run_config_init”:
session_config = tf.ConfigProto(allow_soft_placement=True) run_config = tf.estimator.RunConfig( train_distribute=distribution_strategy, session_config=session_config, save_checkpoints_secs=60*60*24) classifier = tf.estimator.Estimator( model_fn=model_function, model_dir=flags_obj.model_dir, config=npu_run_config_init(run_config=run_config))
- 配置“iterations_per_loop”:
session_config = tf.ConfigProto(allow_soft_placement=True) custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = 'NpuOptimizer' custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件 custom_op.parameter_map["iterations_per_loop"].i = 10 run_config = tf.estimator.RunConfig( train_distribute=distribution_strategy, session_config=session_config, save_checkpoints_secs=60*60*24) classifier = tf.estimator.Estimator( model_fn=model_function, model_dir=flags_obj.model_dir, config=npu_run_config_init(run_config=run_config))
- 如果脚本中的运行配置函数,例如RunConfig中没有传入session_config参数,需要自行传入session_config:
session_config = tf.ConfigProto() custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = 'NpuOptimizer' custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件 custom_op.parameter_map["iterations_per_loop"].i = 10 run_config = tf.estimator.RunConfig( train_distribute=distribution_strategy, session_config=session_config, save_checkpoints_secs=60*60*24) classifier = tf.estimator.Estimator( model_fn=model_function, model_dir=flags_obj.model_dir, config=npu_run_config_init(run_config=run_config))
- 增加“SetIterationsVarHook”:
train_hooks = hooks_helper.get_train_hooks( flags_obj.hooks, model_dir=flags_obj.model_dir, batch_size=flags_obj.batch_size) train_hooks.append(SetIterationsVarHook(10))
- 在train_op中增加“IterationOp”:
train_op = opt.apply_gradients( grad_var_list, global_step = global_step ) train_op = tf.group(train_op, name="IterationOp") #该name设置到梯度更新返回的op
session.run模式修改
session.run模式下,通过set_iteration_per_loop设置iterations_per_loop参数,并修改session.run调用次数为原调用次数除以iterations_per_loop。
- 检查迁移后的脚本是否存在“init_resource”。
- 如果存在,则需要参考下面示例进行修改;修改完后,执行下一步。
- 如果不存在,则直接执行下一步。
if __name__ == '__main__': session_config = tf.ConfigProto() custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件 custom_op.parameter_map["iterations_per_loop"].i = 10 (npu_sess, npu_shutdown) = init_resource(config=session_config) tf.app.run() shutdown_resource(npu_sess, npu_shutdown) close_session(npu_sess)
- 在脚本中找到“npu_config_proto”,配置iterations_per_loop。
from __future__ import print_function import input_data from npu_bridge.npu_init import * mnist = input_data.read_data_sets("/test/", one_hot=True) import tensorflow as tf # 设置模型 # 学习率 learning_rate = 0.01 # 训练迭代次数 training_epochs = 10 # batch大小 batch_size = 100 # 每多少次迭代显示一次损失 display_step = 1 x = tf.placeholder(tf.float32, [None, 784]) y = tf.placeholder(tf.float32, [None, 10]) # 模型参数 W = tf.Variable(tf.zeros([784, 10])) b = tf.Variable(tf.zeros([10])) # 建立模型 pred = tf.nn.softmax(tf.matmul(x, W) + b) # 定义损失函数:交叉熵 cost = tf.reduce_mean(-tf.reduce_sum(y*tf.log(pred), reduction_indices=1)) # 梯度下降 optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost) # 初始化所有变量 init = tf.global_variables_initializer() config = tf.ConfigProto() custom_op = config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" custom_op.parameter_map["mix_compile_mode"].b = False #关闭混合计算,根据实际情况配置,默认关闭 custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件 custom_op.parameter_map["iterations_per_loop"].i = 10 #此处设置的值和set_iteration_per_loop设置的iterations_per_loop值保持一致,用于判断是否进行训练迭代下沉 config = npu_config_proto(config_proto=config) # 训练模型 with tf.Session(config=config) as sess: sess.run(init) # sess.run模式下设置小循环次数为10 train_op = util.set_iteration_per_loop(sess, optimizer, 10) for epoch in range(training_epochs): avg_cost = 0 total_batch = int(mnist.train.num_examples / batch_size) for i in range(total_batch): batch_xs, batch_ys = mnist.train.next_batch(batch_size) _, c = sess.run([train_op, cost], feed_dict={x: batch_xs, y: batch_ys}) avg_cost += c / total_batch
由于上述接口中有改图的操作,如果图无法修改(例如冻结了图或者使用tf.train.Supervisor创建session等),则无法使用set_iteration_per_loop接口设置大小循环。此种情况下请使用create_iteration_per_loop_var和load_iteration_per_loop_var接口设置小循环次数,调用示例:
from __future__ import print_function import input_data from npu_bridge.npu_init import * mnist = input_data.read_data_sets("/test/", one_hot=True) import tensorflow as tf # 设置模型 # 学习率 learning_rate = 0.01 # 训练迭代次数 training_epochs = 10 # batch大小 batch_size = 100 # 每多少次迭代显示一次损失 display_step = 1 x = tf.placeholder(tf.float32, [None, 784]) y = tf.placeholder(tf.float32, [None, 10]) # 模型参数 W = tf.Variable(tf.zeros([784, 10])) b = tf.Variable(tf.zeros([10])) # 建立模型 pred = tf.nn.softmax(tf.matmul(x, W) + b) # 定义损失函数:交叉熵 cost = tf.reduce_mean(-tf.reduce_sum(y*tf.log(pred), reduction_indices=1)) # 梯度下降 optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost) # 初始化所有变量 init = tf.global_variables_initializer() config = tf.ConfigProto() custom_op = config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" custom_op.parameter_map["mix_compile_mode"].b = False #关闭混合计算,根据实际情况配置,默认关闭 custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件 custom_op.parameter_map["iterations_per_loop"].i = 10 #此处设置的值和set_iteration_per_loop设置的iterations_per_loop值保持一致,用于功能校验 config = npu_config_proto(config_proto=config) # 训练模型 with tf.Session(config=config) as sess: sess.run(init) # sess.run模式下设置小循环次数为10 iteration = util.IterationPerLoop() train_op = iteration.create_iteration_per_loop_var(optimizer) #修改图 tf.train.Supervisor(logdir="/home/xxxx",init_op=init) #冻结图 iteration.load_iteration_per_loop_var(sess, 10) #设置小循环次数 for epoch in range(training_epochs): avg_cost = 0 total_batch = int(mnist.train.num_examples / batch_size) for i in range(total_batch): batch_xs, batch_ys = mnist.train.next_batch(batch_size) _, c = sess.run([train_op, cost], feed_dict={x: batch_xs, y: batch_ys}) avg_cost += c / total_batch
父主题: 更多特性