Porting with Horovod
Horovod is a distributed training framework for TensorFlow, Keras, PyTorch, and MXNet, aiming to improve distributed training performance. Compared with the traditional TensorFlow distributed training that uses the PS-Worker architecture, Horovod uses AllReduce to aggregate gradients to better use the bandwidth and solve the bottleneck problem of PS-Worker. This section describes how to port your distributed training script developed based on Horovod for distributed training on the Ascend AI Processor.
Original Horovod code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import tensorflow as tf import horovod.tensorflow as hvd # Initialize Horovod hvd.init() # Pin GPU to be used to process local rank (one GPU per process) config = tf.compat.v1.ConfigProto() config.gpu_options.visible_device_list = str(hvd.local_rank()) # Build model... loss = ... opt = tf.compat.v1.train.AdagradOptimizer(0.01 * hvd.size()) # Add Horovod Distributed Optimizer opt = hvd.DistributedOptimizer(opt) # Add hook to broadcast variables from rank 0 to all other processes during # initialization. hooks = [hvd.BroadcastGlobalVariablesHook(0)] # Make training operation train_op = opt.minimize(loss) # Save checkpoints only on worker 0 to prevent other workers from corrupting them. checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.compat.v1.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, config=config, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Perform synchronous training. mon_sess.run(train_op) |
Code after porting:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | import tensorflow as tf # Import the NPU library. import npu_device as npu from npu_device.compat.v1.npu_init import * # In this example, the init_resource API is called to start another session for initialization. (npu_sess, npu_shutdown) = init_resource() npu.compat.enable_v1() # Pin GPU to be used to process local rank (one GPU per process) config = tf.compat.v1.ConfigProto() config.gpu_options.visible_device_list = str(get_npu_local_rank_id()) # Change "hvd.local_rank" to "get_npu_local_rank_id". # Build model... loss = ... opt = tf.compat.v1.train.AdagradOptimizer(0.01 * get_npu_rank_size()) # Change "hv.size" to "get_npu_rank_size". # NPU allreduce opt = npu_distributed_optimizer_wrapper(opt) # Change "hvd.DistributeOptimizer" to "npu_distributed_optimizer_wrapper". # Add hook to broadcast variables from rank 0 to all other processes during # initialization. hooks = [NPUBroadcastGlobalVariablesHook(0, int(os.getenv('RANK_ID', '0')))] # Change "hvd.BroadcastGlobalVariablesHook(0)" to "NPUBroadcastGlobalVariablesHook(0, int(os.getenv('RANK_ID', '0')))". # Make training operation train_op = opt.minimize(loss) # Save checkpoints only on worker 0 to prevent other workers from corrupting them. checkpoint_dir = '/tmp/train_logs' if get_npu_rank_id() == 0 else None # Change "hvd.rank" to "get_npu_rank_id". # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.compat.v1.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, config=config, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Perform synchronous training. mon_sess.run(train_op) # After the training is complete, run shutdown_resource and close the session. shutdown_resource(npu_sess, npu_shutdown) close_session(npu_sess) |
Parent topic: Manual Porting of compat.v1