单机多卡训练手动迁移
请参考样例代码说明获取main.py脚本,和注释掉mps模块相关代码,再进行以下迁移步骤。
- main.py增加头文件,以支持基于PyTorch框架的模型在昇腾AI处理器上训练,以及进行混合精度训练。
import torch import torch_npu from torch_npu.npu import amp
- 参数设置增加以下参数,包括指定参与训练的昇腾AI处理器以及进行混合精度训练需要的参数。
parser.add_argument('--device', default='npu', type=str, help='npu or gpu') parser.add_argument('--addr', default='127.0.0.1', type=str, help='master addr') parser.add_argument('--device_list', default='0,1,2,3,4,5,6,7', type=str, help='device id list') parser.add_argument('--amp', default=False, action='store_true', help='use amp to train the model') parser.add_argument('--loss_scale', default=1024., type=float, help='loss scale using in amp, default -1 means dynamic') parser.add_argument('--dist_backend', default='hccl', type=str, help='distributed backend')
- 创建由device_id到process_id的映射函数,指定device进行训练(请指定相邻的device,如1、2号卡或2、3号卡)。在main.py文件中增加以下函数:
def device_id_to_process_device_map(device_list): devices = device_list.split(",") devices = [int(x) for x in devices] devices.sort() process_device_map = dict() for process_id, device_id in enumerate(devices): process_device_map[process_id] = device_id return process_device_map
- 指定训练服务器的ip和端口。
代码位置:main.py文件中的主函数main()。
添加代码如下:def main(): args = parser.parse_args() os.environ['MASTER_ADDR'] = args.addr os.environ['MASTER_PORT'] = '**' # **为端口号,请根据实际选择一个闲置端口填写
- 创建由device_id到process_id的映射参数,获取单节点昇腾AI处理器数量。
代码位置:main.py文件中的主函数main()。
原代码如下:args.distributed = args.world_size > 1 or args.multiprocessing_distributed if torch.cuda.is_available(): ngpus_per_node = torch.cuda.device_count() else: ngpus_per_node = 1
修改后代码如下:
args.distributed = args.world_size > 1 or args.multiprocessing_distributed args.process_device_map = device_id_to_process_device_map(args.device_list) if args.device == 'npu': ngpus_per_node = len(args.process_device_map) else: ngpus_per_node = torch.cuda.device_count()
- 获取进程process_id对应的昇腾AI处理器编号,指定在对应的昇腾AI处理器上进行训练。
原代码如下:
def main_worker(gpu, ngpus_per_node, args): global best_acc1 args.gpu = gpu
修改后代码如下:def main_worker(gpu, ngpus_per_node, args): global best_acc1 args.gpu = args.process_device_map[gpu]
- 初始化进程组,屏蔽掉初始化方式。原代码如下:
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)
修改后代码如下:if args.device == 'npu': dist.init_process_group(backend=args.dist_backend, #init_method=args.dist_url, world_size=args.world_size, rank=args.rank) else: dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)
- 要进行分布式训练且需要引入混合精度模块,并且需要将模型迁移到昇腾AI处理器上,因此需要屏蔽原始代码中判断是否为分布式训练以及模型是否在GPU上进行训练的代码。
代码位置:main.py文件中的main_worker()。
原代码如下:
# create model if args.pretrained: print("=> using pre-trained model '{}'".format(args.arch)) model = models.__dict__[args.arch](pretrained=True) else: print("=> creating model '{}'".format(args.arch)) model = models.__dict__[args.arch]() if not torch.cuda.is_available(): print('using CPU, this will be slow') ...... else: model = torch.nn.DataParallel(model).cuda()
修改后代码如下:
# create model if args.pretrained: print("=> using pre-trained model '{}'".format(args.arch)) model = models.__dict__[args.arch](pretrained=True) else: print("=> creating model '{}'".format(args.arch)) model = models.__dict__[args.arch]() # 指定训练设备为昇腾AI处理器 loc = 'npu:{}'.format(args.gpu) torch_npu.npu.set_device(loc) # 计算用于训练的batch_size和workers args.batch_size = int(args.batch_size / ngpus_per_node) args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)
- 屏蔽掉损失函数、优化器和断点续训代码,将这部分功能在后面与混合精度训练结合起来。
代码位置:main.py文件中的main_worker()。
需要屏蔽的原代码如下,已注释:
# define loss function (criterion), optimizer, and learning rate scheduler # ..... # optionally resume from a checkpoint # ..... # else: # print("=> no checkpoint found at '{}'".format(args.resume))
再将原代码中的scheduler屏蔽:
... # train for one epoch train(train_loader, model, criterion, optimizer, epoch, device, args) # evaluate on validation set acc1 = validate(val_loader, model, criterion, args) # scheduler.step() ...
- 数据加载器结合了数据集和取样器,并且可以提供多个线程处理数据集。使用昇腾AI处理器进行训练,需要将pin_memory设置为False;由于当前仅支持固定shape下的训练,数据流中剩余的样本数可能小于batch大小,因此需要将drop_last设置为True;另外需要将验证部分数据集shuffle设置为True。
代码位置:main.py文件中的main_worker()。
原代码如下:
train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.workers, pin_memory=True, sampler=train_sampler) val_loader = torch.utils.data.DataLoader( val_dataset, batch_size=args.batch_size, shuffle=False, num_workers=args.workers, pin_memory=True, sampler=val_sampler)
修改后代码如下:
train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.workers, pin_memory=False, sampler=train_sampler, drop_last=True) val_loader = torch.utils.data.DataLoader( datasets.ImageFolder(valdir, transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), normalize, ])), batch_size=args.batch_size, shuffle=True, num_workers=args.workers, pin_memory=False, drop_last=True)
- 进行损失函数及优化器构建,将模型、损失函数迁移到昇腾AI处理器上。需要添加的代码如下:
val_loader = torch.utils.data.DataLoader( datasets.ImageFolder(valdir, transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), normalize, ])), batch_size=args.batch_size, shuffle=True, num_workers=args.workers, pin_memory=False, drop_last=True) model = model.to(loc) # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().to(loc) optimizer = torch.optim.SGD(model.parameters(), args.lr, momentum=args.momentum, weight_decay=args.weight_decay) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) scaler = amp.GradScaler() # optionally resume from a checkpoint if args.resume: if os.path.isfile(args.resume): print("=> loading checkpoint '{}'".format(args.resume)) checkpoint = torch.load(args.resume, map_location=loc) args.start_epoch = checkpoint['epoch'] best_acc1 = checkpoint['best_acc1'] model.load_state_dict(checkpoint['state_dict']) optimizer.load_state_dict(checkpoint['optimizer']) if args.amp: amp.load_state_dict(checkpoint['amp']) print("=> loaded checkpoint '{}' (epoch {})" .format(args.resume, checkpoint['epoch'])) else: print("=> no checkpoint found at '{}'".format(args.resume)) cudnn.benchmark = True
- 增加判断是否使用AMP判断。修改前:
def train(train_loader, model, criterion, optimizer, epoch, device, args): ...... # compute output output = model(images) loss = criterion(output, target) # measure accuracy and record loss acc1, acc5 = accuracy(output, target, topk=(1, 5)) losses.update(loss.item(), images.size(0)) top1.update(acc1[0], images.size(0)) top5.update(acc5[0], images.size(0)) # compute gradient and do SGD step optimizer.zero_grad() loss.backward() optimizer.step()
修改后:
scaler = amp.GradScaler() end = time.time() if args.amp: for i, (images, target) in enumerate(train_loader): ...... # compute output with amp.autocast(): output = model(images) loss = criterion(output, target) ...... # 进行反向传播前后的loss缩放、参数更新 scaler.scale(loss).backward() # loss缩放并反向转播 scaler.step(optimizer) # 更新参数(自动unscaling) scaler.update() # 基于动态Loss Scale更新loss_scaling系数 ...... if i % args.print_freq == 0: progress.display(i + 1) else: for i, (images, target) in enumerate(train_loader): # measure data loading time data_time.update(time.time() - end) # move data to the same device as model images = images.to(device, non_blocking=True) target = target.to(device, non_blocking=True) # compute output output = model(images) loss = criterion(output, target) # measure accuracy and record loss acc1, acc5 = accuracy(output, target, topk=(1, 5)) losses.update(loss.item(), images.size(0)) top1.update(acc1[0], images.size(0)) top5.update(acc5[0], images.size(0)) # compute gradient and do SGD step optimizer.zero_grad() loss.backward() optimizer.step() # measure elapsed time batch_time.update(time.time() - end) end = time.time() if i % args.print_freq == 0: progress.display(i + 1)
- 将断点checkpoint保存与混合精度训练结合。
原代码如下:
# remember best acc@1 and save checkpoint is_best = acc1 > best_acc1 best_acc1 = max(acc1, best_acc1) if not args.multiprocessing_distributed or (args.multiprocessing_distributed and args.rank % ngpus_per_node == 0): save_checkpoint({ 'epoch': epoch + 1, 'arch': args.arch, 'state_dict': model.state_dict(), 'best_acc1': best_acc1, 'optimizer' : optimizer.state_dict(), 'scheduler' : scheduler.state_dict() }, is_best)
修改后代码如下:# remember best acc@1 and save checkpoint is_best = acc1 > best_acc13e4d best_acc1 = max(acc1, best_acc1) if not args.multiprocessing_distributed or (args.multiprocessing_distributed and args.rank % ngpus_per_node == 0): if args.amp: save_checkpoint({ 'epoch': epoch + 1, 'arch': args.arch, 'state_dict': model.state_dict(), 'best_acc1': best_acc1, 'optimizer' : optimizer.state_dict(), 'amp': scaler.state_dict(), }, is_best) else: save_checkpoint({ 'epoch': epoch + 1, 'arch': args.arch, 'state_dict': model.state_dict(), 'best_acc1': best_acc1, 'optimizer' : optimizer.state_dict(), }, is_best)
- 训练时,需要将数据集迁移到昇腾AI处理器上。原代码如下:
for i, (images, target) in enumerate(train_loader): # measure data loading time data_time.update(time.time() - end) # move data to the same device as model images = images.to(device, non_blocking=True) target = target.to(device, non_blocking=True)
修改后代码如下:for i, (images, target) in enumerate(train_loader): # measure data loading time data_time.update(time.time() - end) loc = 'npu:{}'.format(args.gpu) target = target.to(torch.int32) images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
- 将验证数据集迁移到昇腾AI处理器上。原代码如下:
with torch.no_grad(): end = time.time() for i, (images, target) in enumerate(val_loader): i = base_progress + i if args.gpu is not None and torch.cuda.is_available(): images = images.cuda(args.gpu, non_blocking=True) #if torch.backends.mps.is_available(): #images = images.to('mps') #target = target.to('mps') if torch.cuda.is_available(): target = target.cuda(args.gpu, non_blocking=True)
修改后代码如下:with torch.no_grad(): end = time.time() for i, (images, target) in enumerate(val_loader): loc = 'npu:{}'.format(args.gpu) target = target.to(torch.int32) images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
- 执行训练脚本拉起训练进程,例如:(以下参数为举例,用户可根据实际情况自行改动)
python3 main.py /home/data/resnet50/imagenet --workers 160 \ # 加载数据进程数 --lr 0.8 \ # 学习率 --arch resnet50 \ # 模型架构 --dist-url 'tcp://127.0.0.1:**' \ # **为端口号,请根据实际选择一个闲置端口填写 --dist-backend 'hccl' \ #通信方式 --multiprocessing-distributed \ # 使用多卡训练 --device_list '0,1,2,3,4,5,6,7' #输入devicelist进行多卡训练 --world-size 1 \ --batch-size 2048 \ # 训练批次大小,请尽量设置为处理器核数的倍数以更好的发挥性能 --epochs 90 \ # 训练迭代轮数 --rank 0 \ --amp # 开启amp混合精度
查看训练后是否生成权重文件,生成了如下图文件则说明迁移训练成功。
父主题: 样例参考