Training in Data Parallel Mode (AllReduce)
AllReduce is a mainstream data parallel architecture. Nodes work together based on algorithms. AllReduce applies to scenarios that require high training computing power and a large number of devices. This section describes how to use the TensorFlow-based training script to perform distributed training on the Ascend AI Processor through the AllReduce architecture.
Allreduce Implementation Principle
Classified by gradient aggregation, the common implementation of data parallelism includes the Parameter Server-Workers (PS-Worker) architecture and AllReduce architecture. In the AllReduce architecture, all devices used for training form a ring. There is no central node to aggregate the calculated gradients. The AllReduce algorithm places the devices used for training in a logical ring. Each device receives data from the upstream device and transmits data to the downstream device to fully utilize the bandwidth.
The AllReduce architecture is proposed to solve the problem that the PS-Worker architecture cannot be linearly expanded. In this architecture, the nodes cooperate with each other according to the algorithm, aiming to reduce data transfers and make full use of the hardware bandwidth. This architecture works well for use cases with high training compute power requirement and a large number of devices. The following figure shows the implementation of the AllReduce architecture.
Ring-AllReduce adopts a ring-based worker topology without a central node as shown in Figure 2. In an iteration, each worker completes its own mini-batch training, calculates the gradient, and passes the gradient to the next worker, while it also receives the gradient from the previous worker. The Ring-AllReduce algorithm consists of two parts: scatter-reduce and allgather. In the ring, each worker sends the gradient data to the next worker in multiple steps and receives the gradient data from the previous worker in multiple steps. For a ring that contains N workers, each worker needs to receive 2 x (N – 1) gradient chunks from other workers (1/N chunks received each time) and send 2 x (N – 1) gradient chunks to other nodes (1/N chunks sent each time).
APIs Involved
In TensorFlow, tf.distribute.Strategy is generally used for distributed training. For details, click here. Currently, the preceding distribution policy is not supported by the Ascend AI Processor. TF Adapter provides the distribution API npu_distributed_optimizer_wrapper to add the NPU AllReduce operation to the input gradient function of the optimizer and return the optimizer. In this way, gradient aggregation is performed after gradients are calculated between devices if the single-server multi-device or multi-server multi-device mode is used. After the function is called, AllReduce operators are inserted between the computed gradients and update operators in the generated training graph.
Therefore, the original TensorFlow training script needs to be updated to support distributed training on the Ascend AI Processor.
Dataset Splitting
During distributed training, you can use TensorFlow APIs to split datasets. If processor resource information is required during dataset splitting, you can use the collective communication API get_rank_size to obtain the number of Ascend AI Processors and get_rank_id to obtain the processor ID. For example:
1
|
dataset = dataset.shard(get_rank_size(),get_rank_id()) |
Script Porting in Estimator Mode
- With TensorFlow, you can pass the strategy object to Estimator's Runconfig, which is not allowed by TF Adapter currently. You need to delete the related code. For example:
Before porting:
1 2 3 4 5 6
mirrored_strategy = tf.distribute.MirroredStrategy() config = tf.estimator.RunConfig( train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy, session_config=session_config, save_checkpoints_secs=60*60*24)
After porting:
1 2 3
config = tf.estimator.NPURunConfig( session_config=session_config, save_checkpoints_secs=60*60*24)
- Then, call npu_distributed_optimizer_wrapper to add the AllReduce operation of NPU to the input gradient function of the optimizer and return the input optimizer so that distributed computing can be implemented on the Ascend AI Processor. For details about the function, see "npu_distributed_optimizer_wrapper" in TF Adapter APIs (1.x). The specific method is described as follows:
1 2 3 4 5 6 7 8 9 10 11 12
def cnn_model_fn(features,labels,mode): # Construct the network. xxx # Calculate the loss. xxx #Configure the TrainingOp(for TRAIN mode) if mode == tf.estimator.ModeKeys.TRAIN: optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001) # Use the SGD optimizer. optimizer = npu_distributed_optimizer_wrapper(optimizer) # Use NPU-based distributed computing to update gradients. train_op=optimizer.minimize(loss=loss,global_step=tf.train.get_global_step()) # Minimize the loss. return tf.estimator.EstimatorSpec(mode=mode,loss=loss,train_op=train_op)
- NPUDistributedOptimizer is still compatible in the current version.
- In Estimator mode, when npu_distributed_optimizer_wrapper is used to implement the AllReduce function, NPUBroadcastGlobalVariablesHook is automatically added to NPUEstimator. Therefore, you do not need to manually implement broadcast.
If the original script uses the TensorFlow API to compute the gradient, for example, grads = tf.gradients(loss, tvars), the npu_allreduce API needs to be called to perform AllReduce on the gradient after the gradient computation is complete.
Before porting:
1grads = tf.gradients(a + b, [a, b], stop_gradients=[a, b])
After porting:
1grads = npu_allreduce(tf.gradients(a + b, [a, b], stop_gradients=[a, b]))
Script Porting in sess.run Mode
In Estimator mode, when npu_distributed_optimizer_wrapper is used to implement the AllReduce function, NPUBroadcastGlobalVariablesHook is automatically added to NPUEstimator. Therefore, you do not need to manually implement broadcast. But in sess.run mode, you need manual implementation. The specific method is described as follows:
- After the variable is initialized and before training, the variable is broadcast by using the collective communication API broadcast. For details about the broadcast API, see Huawei Collective Communication Library (HCCL) APIs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
from npu_bridge.npu_init import * def broadcast_global_variables(root_rank, index): """Broadcasts all global variables from root rank to all other processes. Arguments: root_rank: rank of the process from which global variables will be broadcasted to all other processes. index: rank_id """ op_list = [] for var in tf.trainable_variables(): # the input and out tensor of HCOMBroadcast interface are list if "float" in var.dtype.name: inputs = [var] outputs=hccl_ops.broadcast(tensor=inputs,root_rank=root_rank) if outputs is not None: op_list.append(outputs[0].op) op_list.append(tf.assign(var, outputs[0])) return tf.group(op_list) ... bcast_op = broadcast_global_variables(root_rank, index) sess = tf.Session() ... sess.run(bcast_op)
In addition, the broadcast API involves graph modification. If a graph cannot be modified (for example, the graph is frozen or a session is created using tf.train.Supervisor), you must unfreeze the graph first.
1 2 3
with sv.managed_session() as sess: sess.graph._unsafe_unfinalize () # Unfreeze the graph. sess.run(bcast_op)
- During training, call npu_distributed_optimizer_wrapper to aggregate the gradients after computing data of each device by using the gradient optimizer.
1 2 3
from npu_bridge.npu_init import * optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001) # Use the SGD optimizer. distributedOptimizer=npu_distributed_optimizer_wrapper(optimizer) # Use NPU-based distributed computing to update gradients.
NPUDistributedOptimizer is still compatible in the current version.
If the original script uses the TensorFlow API to compute the gradient, for example, grads = tf.gradients(loss, tvars), the npu_allreduce API needs to be called to perform AllReduce on the gradient after the gradient computation is complete.1grads = npu_allreduce(tf.gradients(a + b, [a, b], stop_gradients=[a, b]))
