多卡分布式训练拉起方式
有3种方式可拉起分布式训练,分别为mp.spawn方式、Python方式、shell脚本方式(推荐)。以下脚本中除备注内容为不同场景配置时需注意的要点外,其余代码均相同,着重说明不同方式启动时代码修改的重点。
- 导入依赖。
import torch.nn.parallel import torch.distributed as dist import torch.multiprocessing as mp #使用mp.spawn方式启动时配置
- torch.nn.parallel用于调用模型并行接口。
- torch.distributed用于调用初始化进程组接口。
- torch.multiprocessing用于调用多个进程接口。
- 参数设置增加以下参数,包括指定参与训练的昇腾910 AI处理器需要的参数。
parser.add_argument("--local_rank", default=-1, type=int) #使用mp.spawn方式与shell方式启动时需删除此项 parser.add_argument('--addr', default='127.0.0.1', type=str, help='master addr') parser.add_argument('--port', default='**', type=str, help='master port') # **为端口号,请根据实际选择一个闲置端口填写 parser.add_argument('--world-size', default=1, type=int, help='number of nodes for distributed training') parser.add_argument('--rank', default=0, type=int, help='node rank for distributed training') parser.add_argument('--dist-url', default='env://', type=str, help='url used to set up distributed training') parser.add_argument('--dist-backend', default='hccl', type=str, help='distributed backend') parser.add_argument('--multiprocessing-distributed', action='store_true', help='Use multi-processing distributed training to launch ' 'N processes per node, which has N NPUs. This is the ' 'fastest way to use PyTorch for either single node or ' 'multi node data parallel training')
- --local_rank用于自动获取device号。
- --addr和--port用于多进程之间通信。
- --multiprocessing-distributed用于判断是否使用分布式训练。
- --world-size、--rank、--dist-url、--dist-backend为下面初始化进程组接口所需参数。
- 获取训练服务器可用device数、设置地址和端口号、拉起多进程(mp.spawn方式)。
代码位置:main.py文件中的主函数main()(文件名以及函数名根据具体模型而定,下同)。
由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要配置MASTER_ADDR、MASTER_PORT等参数。
原代码如下:
def main(): args = parser.parse_args() ngpus_per_node = torch.cuda.device_count() main_worker(args.gpu, ngpus_per_node, args)
- Python方式、shell脚本方式启动时,代码修改如下:
def main(): args = parser.parse_args() os.environ['MASTER_ADDR'] = args.addr os.environ['MASTER_PORT'] = args.port ngpus_per_node = torch.npu.device_count() main_worker(args.gpu, ngpus_per_node, args)
- mp.spawn方式启动时,代码修改如下:
def main(): args = parser.parse_args() os.environ['MASTER_ADDR'] = args.addr os.environ['MASTER_PORT'] = args.port ngpus_per_node = torch.npu.device_count() if args.multiprocessing_distributed: mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args)) else: # Simply call main_worker function main_worker(args.gpu, ngpus_per_node, args)
其中mp.spawn函数中第一个参数为模型主函数名称,根据具体模型具体修改。
- Python方式、shell脚本方式启动时,代码修改如下:
- 添加分布式逻辑。
- 初始化进程组。
代码位置:main.py文件中的函数main_worker()。
原代码如下:
def main_worker(gpu, ngpus_per_node, args): global best_acc1 args.gpu = gpu if args.gpu is not None: print("Use GPU: {} for training".format(args.gpu))
不同的拉起训练方式下,device号的获取方式不同:
- Python方式:任务拉起后,local_rank自动获得device号。
- mp.spawn方式:mp.spawn多进程拉起main_worker后,第一个参数GPU自动获得device号(0 ~ ngpusper_node - 1)。
- shell脚本方式:在shell脚本中循环传入local_rank变量作为指定的device。
用户需根据自己选择的方式对代码中args.gpu参数做不同的修改,然后添加具体代码以初始化进程组。修改如下:
- Python方式
args.gpu = args.local_rank
- mp.spawn方式
args.gpu = gpu
- shell脚本方式
args.gpu = int(os.environ['LOCAL_RANK'])
以shell脚本方式为例,修改、添加后完整代码如下:
def main_worker(gpu, ngpus_per_node, args): global best_acc1 args.gpu = int(os.environ['LOCAL_RANK']) #shell脚本方式 if args.gpu is not None: print("Use NPU: {} for training".format(args.gpu)) if args.multiprocessing_distributed: # For multiprocessing distributed training, rank needs to be the # global rank among all the processes args.rank = args.rank * ngpus_per_node + args.gpu args.world_size = ngpus_per_node * args.world_size args.batch_size = int(args.batch_size / ngpus_per_node) dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)
在8P分布式情况下传入的batchsize一般为单P的8倍,所以需要对batchsize进行处理,以保证8P分布式每张卡的batchsize和单P保持一致;同样地,为了保证精度,8P分布式情况下传入的学习率也应该为单P时的8倍,但模型中不需要对学习率再做处理。
- 数据集切分和模型并行。
数据加载器结合了数据集和取样器,并以提供多个线程处理数据集。由于当前仅支持固定shape下的训练,数据流中剩余的样本数可能小于batch大小,因此需要将drop_last设置为True;train_sampler存在时train_loader的shuffle参数不可为True,因此shuffle须设置为train_sampler is None。
- 找到代码位置:main.py文件中的main_worker()。
train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, num_workers=args.workers, pin_memory=True)
修改后代码如下:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) if args.multiprocessing_distributed else None train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, num_workers=args.workers, pin_memory=True, shuffle=(train_sampler is None), sampler=train_sampler, drop_last=True)
- 找到模型定义处。
print("=> creating model '{}'".format(args.arch)) model = models.__dict__[args.arch]()
修改后:
print("=> creating model '{}'".format(args.arch)) model = models.__dict__[args.arch]() model = model.to('npu:{}'.format(args.gpu)) if args.multiprocessing_distributed: model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
- 找到代码位置:main.py文件中的main_worker()。
- 设置当前的epoch,为了让不同的结点之间保持同步。
代码位置:main.py文件中的main_worker()。
原代码如下:
for epoch in range(args.start_epoch, args.epochs): adjust_learning_rate(optimizer, epoch, args)
修改后代码如下:
for epoch in range(args.start_epoch, args.epochs): if args.multiprocessing_distributed: train_sampler.set_epoch(epoch) adjust_learning_rate(optimizer, epoch, args)
- 初始化进程组。
- 拉起训练。
- Python方式启动,其余所需参数未列举。
python3 -m torch.distributed.launch --nproc_per_node 8 main.py
- mp.spawn方式启动。
python3 main.py --rank 0 --world-size 1 --dist-url 'env://' --dist-backend 'hccl' --multiprocessing-distributed
- shell脚本方式启动新建shell脚本。
RANK_ID_START=0 WORLD_SIZE=8 for((RANK_ID=$RANK_ID_START;RANK_ID<$((WORLD_SIZE+RANK_ID_START));RANK_ID++)); do echo "Device ID: $RANK_ID" export LOCAL_RANK=$RANK_ID python3 main.py --rank 0 --world-size 1 --dist-url 'env://' --dist-backend 'hccl' --multiprocessing-distributed done wait
- Python方式启动,其余所需参数未列举。
父主题: 训练迁移样例参考