昇腾社区首页
中文
注册

单机多卡训练手动迁移

请参考样例代码说明获取main.py脚本,和注释掉mps模块相关代码,再进行以下迁移步骤。

  1. main.py增加头文件,以支持基于PyTorch框架的模型在昇腾AI处理器上训练,以及进行混合精度训练。
    import torch
    import torch_npu
    from torch_npu.npu import amp
  2. 参数设置增加以下参数,包括指定参与训练的昇腾AI处理器以及进行混合精度训练需要的参数。
    parser.add_argument('--device', default='npu', type=str, help='npu or gpu')                        
    parser.add_argument('--addr', default='127.0.0.1', type=str, help='master addr')                       
    parser.add_argument('--device_list', default='0,1,2,3,4,5,6,7', type=str, help='device id list')
    parser.add_argument('--amp', default=False, action='store_true', help='use amp to train the model')                    
    parser.add_argument('--loss_scale', default=1024., type=float,
                        help='loss scale using in amp, default -1 means dynamic')
    parser.add_argument('--dist_backend', default='hccl', type=str,
                        help='distributed backend')
  3. 创建由device_id到process_id的映射函数,指定device进行训练(请指定相邻的device,如1、2号卡或2、3号卡)。在main.py文件中增加以下函数:
    def device_id_to_process_device_map(device_list):
        devices = device_list.split(",")
        devices = [int(x) for x in devices]
        devices.sort()
    
        process_device_map = dict()
        for process_id, device_id in enumerate(devices):
            process_device_map[process_id] = device_id
    
        return process_device_map
  4. 指定训练服务器的ip和端口。

    代码位置:main.py文件中的主函数main()。

    添加代码如下:
    def main():
        args = parser.parse_args()
        os.environ['MASTER_ADDR'] = args.addr 
        os.environ['MASTER_PORT'] = '**'        # **为端口号,请根据实际选择一个闲置端口填写
  5. 创建由device_id到process_id的映射参数,获取单节点昇腾AI处理器数量。

    代码位置:main.py文件中的主函数main()。

    原代码如下:
    args.distributed = args.world_size > 1 or args.multiprocessing_distributed
    if torch.cuda.is_available():
        ngpus_per_node = torch.cuda.device_count()
    else:
        ngpus_per_node = 1

    修改后代码如下:

    args.distributed = args.world_size > 1 or args.multiprocessing_distributed
    args.process_device_map = device_id_to_process_device_map(args.device_list)
    if args.device == 'npu':
        ngpus_per_node = len(args.process_device_map)
    else:
        ngpus_per_node = torch.cuda.device_count()
  6. 获取进程process_id对应的昇腾AI处理器编号,指定在对应的昇腾AI处理器上进行训练。

    原代码如下:

    def main_worker(gpu, ngpus_per_node, args):   
        global best_acc1
        args.gpu = gpu
    修改后代码如下:
    def main_worker(gpu, ngpus_per_node, args):   
        global best_acc1
        args.gpu = args.process_device_map[gpu]
  7. 初始化进程组,屏蔽掉初始化方式。
    原代码如下:
          dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                                   world_size=args.world_size, rank=args.rank)
    修改后代码如下:
           if args.device == 'npu':
               dist.init_process_group(backend=args.dist_backend, #init_method=args.dist_url,
                                   world_size=args.world_size, rank=args.rank)
           else:
               dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,         
                                   world_size=args.world_size, rank=args.rank)
  8. 要进行分布式训练且需要引入混合精度模块,并且需要将模型迁移到昇腾AI处理器上,因此需要屏蔽原始代码中判断是否为分布式训练以及模型是否在GPU上进行训练的代码。

    代码位置:main.py文件中的main_worker()。

    原代码如下:

        # create model
        if args.pretrained:
            print("=> using pre-trained model '{}'".format(args.arch))
            model = models.__dict__[args.arch](pretrained=True)
        else:
            print("=> creating model '{}'".format(args.arch))
            model = models.__dict__[args.arch]()
        if not torch.cuda.is_available():
            print('using CPU, this will be slow')
            ......
            else:
                model = torch.nn.DataParallel(model).cuda()

    修改后代码如下:

        # create model
        if args.pretrained:
            print("=> using pre-trained model '{}'".format(args.arch))
            model = models.__dict__[args.arch](pretrained=True)
        else:
            print("=> creating model '{}'".format(args.arch))
            model = models.__dict__[args.arch]()
        # 指定训练设备为昇腾AI处理器
        loc = 'npu:{}'.format(args.gpu)
        torch_npu.npu.set_device(loc)
        # 计算用于训练的batch_size和workers
        args.batch_size = int(args.batch_size / ngpus_per_node)
        args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)
  9. 屏蔽掉损失函数、优化器和断点续训代码,将这部分功能在后面与混合精度训练结合起来。

    代码位置:main.py文件中的main_worker()。

    需要屏蔽的原代码如下,已注释:

        # define loss function (criterion), optimizer, and learning rate scheduler
        #     .....
        # optionally resume from a checkpoint
        #     .....
        #     else:
        #         print("=> no checkpoint found at '{}'".format(args.resume))

    再将原代码中的scheduler屏蔽:

    ...
    # train for one epoch
    train(train_loader, model, criterion, optimizer, epoch, device, args)
    
    # evaluate on validation set
    acc1 = validate(val_loader, model, criterion, args)
    
    # scheduler.step()
    ...
  10. 数据加载器结合了数据集和取样器,并且可以提供多个线程处理数据集。使用昇腾AI处理器进行训练,需要将pin_memory设置为False;由于当前仅支持固定shape下的训练,数据流中剩余的样本数可能小于batch大小,因此需要将drop_last设置为True;另外需要将验证部分数据集shuffle设置为True。

    代码位置:main.py文件中的main_worker()。

    原代码如下:

        train_loader = torch.utils.data.DataLoader(
            train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
            num_workers=args.workers, pin_memory=True, sampler=train_sampler)
    
        val_loader = torch.utils.data.DataLoader(
            val_dataset, batch_size=args.batch_size, shuffle=False,
            num_workers=args.workers, pin_memory=True, sampler=val_sampler)

    修改后代码如下:

        train_loader = torch.utils.data.DataLoader(
            train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
            num_workers=args.workers, pin_memory=False, sampler=train_sampler, drop_last=True)
    
        val_loader = torch.utils.data.DataLoader(
            datasets.ImageFolder(valdir, transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                normalize,
            ])),
            batch_size=args.batch_size, shuffle=True,
            num_workers=args.workers, pin_memory=False, drop_last=True)
  11. 进行损失函数及优化器构建,将模型、损失函数迁移到昇腾AI处理器上。
    需要添加的代码如下:
        val_loader = torch.utils.data.DataLoader(
            datasets.ImageFolder(valdir, transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                normalize,
            ])),
            batch_size=args.batch_size, shuffle=True,
            num_workers=args.workers, pin_memory=False, drop_last=True)
    
        model = model.to(loc)
        # define loss function (criterion) and optimizer
        criterion = nn.CrossEntropyLoss().to(loc)
        optimizer = torch.optim.SGD(model.parameters(), args.lr,
                                    momentum=args.momentum,
                                    weight_decay=args.weight_decay)    
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
        scaler = amp.GradScaler()
           
    
        # optionally resume from a checkpoint
        if args.resume:
            if os.path.isfile(args.resume):
                print("=> loading checkpoint '{}'".format(args.resume))
                checkpoint = torch.load(args.resume, map_location=loc)
                args.start_epoch = checkpoint['epoch']
                best_acc1 = checkpoint['best_acc1']
                model.load_state_dict(checkpoint['state_dict'])
                optimizer.load_state_dict(checkpoint['optimizer'])
                if args.amp:
                    amp.load_state_dict(checkpoint['amp'])
                print("=> loaded checkpoint '{}' (epoch {})"
                      .format(args.resume, checkpoint['epoch']))
            else:
                print("=> no checkpoint found at '{}'".format(args.resume))
    
        cudnn.benchmark = True
  12. 增加判断是否使用AMP判断。
    修改前:
    def train(train_loader, model, criterion, optimizer, epoch, device, args):
    ......
            # compute output
            output = model(images)
            loss = criterion(output, target)
    
            # measure accuracy and record loss
            acc1, acc5 = accuracy(output, target, topk=(1, 5))
            losses.update(loss.item(), images.size(0))
            top1.update(acc1[0], images.size(0))
            top5.update(acc5[0], images.size(0))
    
            # compute gradient and do SGD step
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    修改后:

       scaler = amp.GradScaler()   
        end = time.time()
        if args.amp:
    		for i, (images, target) in enumerate(train_loader):
                            ......
    
    			# compute output
    			with amp.autocast():
    				output = model(images)
    				loss = criterion(output, target)
                            ......
    			# 进行反向传播前后的loss缩放、参数更新
    			scaler.scale(loss).backward()    # loss缩放并反向转播
    			scaler.step(optimizer)    # 更新参数(自动unscaling)
    			scaler.update()    # 基于动态Loss Scale更新loss_scaling系数
                            ......
    			if i % args.print_freq == 0:
    				progress.display(i + 1)
        else:
    		for i, (images, target) in enumerate(train_loader):
    			# measure data loading time
    			data_time.update(time.time() - end)
    
    			# move data to the same device as model
    			images = images.to(device, non_blocking=True)
    			target = target.to(device, non_blocking=True)
    
    			# compute output
    		
    			output = model(images)
    			loss = criterion(output, target)
    
    			# measure accuracy and record loss
    			acc1, acc5 = accuracy(output, target, topk=(1, 5))
    			losses.update(loss.item(), images.size(0))
    			top1.update(acc1[0], images.size(0))
    			top5.update(acc5[0], images.size(0))
    
    			# compute gradient and do SGD step
    			optimizer.zero_grad()
    			loss.backward()
    			optimizer.step()
    
    			# measure elapsed time
    			batch_time.update(time.time() - end)
    			end = time.time()
    
    			if i % args.print_freq == 0:
    				progress.display(i + 1)
  13. 将断点checkpoint保存与混合精度训练结合。

    原代码如下:

    # remember best acc@1 and save checkpoint
    is_best = acc1 > best_acc1
    best_acc1 = max(acc1, best_acc1)
    
    if not args.multiprocessing_distributed or (args.multiprocessing_distributed
            and args.rank % ngpus_per_node == 0):
        save_checkpoint({
            'epoch': epoch + 1,
            'arch': args.arch,
            'state_dict': model.state_dict(),
            'best_acc1': best_acc1,
            'optimizer' : optimizer.state_dict(),
            'scheduler' : scheduler.state_dict()
        }, is_best)
    修改后代码如下:
            # remember best acc@1 and save checkpoint
            is_best = acc1 > best_acc13e4d
            best_acc1 = max(acc1, best_acc1)
    
            if not args.multiprocessing_distributed or (args.multiprocessing_distributed
                    and args.rank % ngpus_per_node == 0):
                if args.amp:
                    save_checkpoint({
                        'epoch': epoch + 1,
                        'arch': args.arch,
                        'state_dict': model.state_dict(),
                        'best_acc1': best_acc1,
                        'optimizer' : optimizer.state_dict(),
                        'amp': scaler.state_dict(),
                    }, is_best)
                else:
                    save_checkpoint({
                        'epoch': epoch + 1,
                        'arch': args.arch,
                        'state_dict': model.state_dict(),
                        'best_acc1': best_acc1,
                        'optimizer' : optimizer.state_dict(),
                    }, is_best)
  14. 训练时,需要将数据集迁移到昇腾AI处理器上。
    原代码如下:
        for i, (images, target) in enumerate(train_loader):
            # measure data loading time
            data_time.update(time.time() - end)
            
            # move data to the same device as model
            images = images.to(device, non_blocking=True)
            target = target.to(device, non_blocking=True)
    修改后代码如下:
        for i, (images, target) in enumerate(train_loader):
            # measure data loading time
            data_time.update(time.time() - end)
            loc = 'npu:{}'.format(args.gpu)
            target = target.to(torch.int32)
            images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
  15. 将验证数据集迁移到昇腾AI处理器上。
    原代码如下:
        with torch.no_grad():
            end = time.time()
            for i, (images, target) in enumerate(val_loader):
                i = base_progress + i
                if args.gpu is not None and torch.cuda.is_available():
                    images = images.cuda(args.gpu, non_blocking=True)
                #if torch.backends.mps.is_available():
                        #images = images.to('mps')
                        #target = target.to('mps')
                if torch.cuda.is_available():
                    target = target.cuda(args.gpu, non_blocking=True)
    修改后代码如下:
        with torch.no_grad():
            end = time.time()
            for i, (images, target) in enumerate(val_loader):
                loc = 'npu:{}'.format(args.gpu)
                target = target.to(torch.int32)
                images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
  16. 执行训练脚本拉起训练进程,例如:
    (以下参数为举例,用户可根据实际情况自行改动)
    python3 main.py /home/data/resnet50/imagenet   --workers 160 \                   # 加载数据进程数
                                                   --lr 0.8 \                        # 学习率
                                                   --arch resnet50 \                 # 模型架构
                                                   --dist-url 'tcp://127.0.0.1:**' \ # **为端口号,请根据实际选择一个闲置端口填写                
                                                   --dist-backend 'hccl' \           #通信方式
                                                   --multiprocessing-distributed \   # 使用多卡训练
                                                   --device_list '0,1,2,3,4,5,6,7'   #输入devicelist进行多卡训练
                                                   --world-size 1 \
                                                   --batch-size 2048 \               # 训练批次大小,请尽量设置为处理器核数的倍数以更好的发挥性能
                                                   --epochs 90 \                     # 训练迭代轮数
                                                   --rank 0 \
                                                   --amp                             # 开启amp混合精度

    查看训练后是否生成权重文件,生成了如下图文件则说明迁移训练成功。