Porting with Horovod

Horovod is a distributed training framework for TensorFlow, Keras, PyTorch, and MXNet, aiming to improve distributed training performance. Compared with the traditional TensorFlow distributed training that uses the PS-Worker architecture, Horovod uses AllReduce to aggregate gradients to utilize the bandwidth and solve the bottleneck of PS-Worker. This section describes how to port your distributed training script developed based on Horovod for distributed training on the Ascend AI Processor.

For details about Horovod, visit Horovod.

Mapping Between Horovod APIs and NPU APIs

Table 1 shows the mapping between common Horovod TensorFlow APIs and NPU APIs.

Table 1 Mapping between common Horovod APIs and NPU APIs

Horovod API

Description

NPU API

horovod.tensorflow.init

Horovod initialization

npu_ops.initialize_system

horovod.tensorflow.shutdown

Horovod shutdown

npu_ops.shutdown_system

horovod.tensorflow.DistributedOptimizer

Distributed optimizer

npu_distributed_optimizer_wrapper

horovod.tensorflow.size

Number of global ranks

get_rank_size

horovod.tensorflow.local_size

Number of ranks of the current server

get_local_rank_size

horovod.tensorflow.rank

Global rank ID

get_rank_id

horovod.tensorflow.local_rank

Rank ID of the local server

get_local_rank_id

horovod.tensorflow.allgather

AllGather operation

hccl_ops.allgather

horovod.tensorflow.broadcast

Broadcast operation

hccl_ops.broadcast

horovod.tensorflow.alltoall

AlltoAll operation

hccl_ops.all_to_all_v

horovod.tensorflow.allreduce

AllReduce operation

hccl_ops.allreduce

Porting Example

Original Horovod code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import tensorflow as tf
import horovod.tensorflow as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op)

Code after porting:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# Import the NPU libraries.
import tensorflow as tf
from npu_bridge.npu_init import *

# In this example, another session is created to initialize HCCL when the HCCL group management API is called. For details, see .
npu_int = npu_ops.initialize_system()
npu_shutdown = npu_ops.shutdown_system()
config = tf.ConfigProto(allow_soft_placement=True)
# Add an NPU optimizer named NpuOptimizer. During network compilation, the NPU traverses only the session configurations under NpuOptimizer.
custom_op =  config.graph_options.rewrite_options.custom_optimizers.add()
custom_op.name =  "NpuOptimizer"
# Explicitly disable the remapping and memory_optimization functions of TensorFlow to avoid conflicts with the functions of the NPU.
config.graph_options.rewrite_options.remapping = RewriterConfig.OFF
config.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF  
init_sess = tf.Session(config=config)
init_sess.run(npu_int)

# Pin GPU to be used to process local rank (one GPU per process)
config.gpu_options.visible_device_list = str(get_local_rank_id())  # Change "hvd.local_rank" to "get_local_rank_id".

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * get_rank_size())   # Change "hvd.size" to "get_rank_size".

# NPU allreduce
# Change hvd.DistributedOptimizer to npu_distributed_optimizer_wrapper.
opt = npu_distributed_optimizer_wrapper(opt)   
# Add hook to broadcast variables from rank 0 to all other processes during initialization.
hooks = [NPUBroadcastGlobalVariablesHook(0)]

# In sess.run mode, call the broadcast collective communication API to broadcast variables.
input = tf.trainable_variables()
bcast_global_variables_op = hccl_ops.broadcast(input, 0)

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if get_rank_id() == 0 else None  # Change "hvd.rank" to "get_rank_id".

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  # Broadcast the variables.
  mon_sess.run(bcast_global_variables_op)  
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op) 
  
# After training, call the shutdown_system function to close the session.
init_sess.run(npu_shutdown)
init_sess.close()

NPUDistributedOptimizer is still compatible in the current version.