Horovod脚本迁移
Horovod是基于TensorFlow、Keras、PyTorch以及MXNet的分布式训练框架,目的是提升分布式训练的性能。不同于传统的TensorFlow分布式训练采用PS-Worker架构,Horovod使用Allreduce进行聚合梯度,能够更好地利用带宽,解决PS-Worker的瓶颈问题。本节介绍如何迁移基于Horovod开发的分布式训练脚本,使其在昇腾AI处理器进行分布式训练。
Horovod原始代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import tensorflow as tf import horovod.tensorflow as hvd # Initialize Horovod hvd.init() # Pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.visible_device_list = str(hvd.local_rank()) # Build model... loss = ... opt = tf.train.AdagradOptimizer(0.01 * hvd.size()) # Add Horovod Distributed Optimizer opt = hvd.DistributedOptimizer(opt) # Add hook to broadcast variables from rank 0 to all other processes during # initialization. hooks = [hvd.BroadcastGlobalVariablesHook(0)] # Make training operation train_op = opt.minimize(loss) # Save checkpoints only on worker 0 to prevent other workers from corrupting them. checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, config=config, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Perform synchronous training. mon_sess.run(train_op) |
迁移后的代码:
# 导入NPU库 import tensorflow as tf from npu_bridge.npu_init import * # 本示例调用了HCCL的group管理接口,因此需要另起session进行HCCL初始化,更多介绍请参考集合通信初始化 npu_int = npu_ops.initialize_system() npu_shutdown = npu_ops.shutdown_system() config = tf.ConfigProto(allow_soft_placement=True) # 添加名字为“NpuOptimizer”的NPU优化器,网络编译时,NPU只会遍历“NpuOptimizer”下的session配置。 custom_op = config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" # 必须显式关闭TensorFlow的remapping、memory_optimization功能,避免与NPU中的功能冲突。 config.graph_options.rewrite_options.remapping = RewriterConfig.OFF config.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF init_sess = tf.Session(config=config) init_sess.run(npu_int) # Pin GPU to be used to process local rank (one GPU per process) config.gpu_options.visible_device_list = str(get_local_rank_id()) # "hvd.local_rank"修改为"get_local_rank_id" # Build model... loss = ... opt = tf.train.AdagradOptimizer(0.01 * get_rank_size()) # "hvd.size"修改为"get_rank_size" # NPU allreduce # 将"hvd.DistributedOptimizer"修改为"npu_distributed_optimizer_wrapper" opt = npu_distributed_optimizer_wrapper(opt) # Add hook to broadcast variables from rank 0 to all other processes during initialization. hooks = [NPUBroadcastGlobalVariablesHook(0)] # 在session run模式下调用集合通信接口broadcast进行变量广播: input = tf.trainable_variables() bcast_global_variables_op = hccl_ops.broadcast(input, 0) # Make training operation train_op = opt.minimize(loss) # Save checkpoints only on worker 0 to prevent other workers from corrupting them. checkpoint_dir = '/tmp/train_logs' if get_rank_id() == 0 else None # "hvd.rank"修改为"get_rank_id" # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, config=config, hooks=hooks) as mon_sess: # 变量广播 mon_sess.run(bcast_global_variables_op) while not mon_sess.should_stop(): # Perform synchronous training. mon_sess.run(train_op) # 训练结束后执行shutdown_system,同时关闭session init_sess.run(npu_shutdown) init_sess.close()
NPUDistributedOptimizer分布式优化器在当前版本依然兼容。
父主题: 分布式训练脚本迁移