1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | # 导入依赖和库 import torch from torch import nn import torch_npu import torch.distributed as dist from torch.utils.data import DataLoader from torchvision import datasets from torchvision.transforms import ToTensor import time import torch.multiprocessing as mp import os torch.manual_seed(0) # 下载训练数据 training_data = datasets.FashionMNIST( root="./data", train=True, download=True, transform=ToTensor(), ) # 下载测试数据 test_data = datasets.FashionMNIST( root="./data", train=False, download=True, transform=ToTensor(), ) # 构建模型 class NeuralNetwork(nn.Module): def __init__(self): super().__init__() self.flatten = nn.Flatten() self.linear_relu_stack = nn.Sequential( nn.Linear(28*28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10) ) def forward(self, x): x = self.flatten(x) logits = self.linear_relu_stack(x) return logits def test(dataloader, model, loss_fn): size = len(dataloader.dataset) num_batches = len(dataloader) model.eval() test_loss, correct = 0, 0 with torch.no_grad(): for X, y in dataloader: X, y = X.to(device), y.to(device) pred = model(X) test_loss += loss_fn(pred, y).item() correct += (pred.argmax(1) == y).type(torch.float).sum().item() test_loss /= num_batches correct /= size print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n") |
在模型脚本中,构建主函数main,在其中获取分布式训练所需的超参数。
def main(world_size: int, batch_size = 64, total_epochs = 5,): # 用户可自行设置 ngpus_per_node = world_size main_worker(args.gpu, ngpus_per_node, args)
def main(world_size: int, batch_size = 64, total_epochs = 5,): # 用户可自行设置 ngpus_per_node = world_size mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args)) # mp.spawn方式启动
def main(world_size: int, batch_size, args): # 使用Python拉起命令中设置的超参数 ngpus_per_node = world_size args.gpu = args.local_rank # 任务拉起后,local_rank自动获得device号 main_worker(args.gpu, ngpus_per_node, args)
在模型脚本中设置地址与端口号,用于拉起分布式训练。由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要根据实际情况配置MASTER_ADDR、MASTER_PORT等参数。
def ddp_setup(rank, world_size): """ Args: rank: Unique identifier of each process world_size: Total number of processes """ os.environ["MASTER_ADDR"] = "localhost" # 用户根据实际情况设置为rank0节点通信ip,单机时可设置为localhost os.environ["MASTER_PORT"] = "***" # 用户根据实际情况设置为rank0节点的端口号 dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)
dist.init_process_group(..., init_method="tcp://xx:**", ......, ) # xx为rank0节点通信ip,**为端口号,根据实际选择一个闲置端口填写
修改后:init_method使用parallel方式
dist.init_process_group(..., init_method="parallel://xx:**", ......, ) # xx为rank0节点通信ip,**为端口号,根据实际选择一个闲置端口填写
def ddp_setup(rank, world_size): """ Args: rank: Unique identifier of each process world_size: Total number of processes """ dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)
不同的拉起训练方式下,device号的获取方式不同:
用户需根据自己选择的方式对代码做不同的修改。下面是修改代码示例:
def main_worker(gpu, ngpus_per_node, args): start_epoch = 0 end_epoch = 5 args.gpu = int(os.environ['LOCAL_RANK']) # 在shell脚本中循环传入local_rank变量作为指定的device ddp_setup(args.gpu, args.world_size) torch_npu.npu.set_device(args.gpu) total_batch_size = args.batch_size total_workers = ngpus_per_node batch_size = int(total_batch_size / ngpus_per_node) workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node) model = NeuralNetwork() device = torch.device("npu") train_sampler = torch.utils.data.distributed.DistributedSampler(training_data) test_sampler = torch.utils.data.distributed.DistributedSampler(test_data) train_loader = torch.utils.data.DataLoader( training_data, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True) val_loader = torch.utils.data.DataLoader( test_data, batch_size=batch_size, shuffle=(test_sampler is None), num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True) loc = 'npu:{}'.format(args.gpu) model = model.to(loc) criterion = nn.CrossEntropyLoss().to(loc) optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) for epoch in range(start_epoch, end_epoch): print("curr epoch: ", epoch) train_sampler.set_epoch(epoch) train(train_loader, model, criterion, optimizer, epoch, args.gpu) def train(train_loader, model, criterion, optimizer, epoch, gpu): size = len(train_loader.dataset) model.train() end = time.time() for i, (images, target) in enumerate(train_loader): # measure data loading time loc = 'npu:{}'.format(gpu) target = target.to(torch.int32) images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False) # compute output output = model(images) loss = criterion(output, target) # compute gradient and do SGD step optimizer.zero_grad() loss.backward() optimizer.step() end = time.time() if i % 100 == 0: loss, current = loss.item(), i * len(target) print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def main_worker(gpu, ngpus_per_node, args): start_epoch = 0 end_epoch = 5 ddp_setup(gpu, args.world_size) torch_npu.npu.set_device(gpu) total_batch_size = args.batch_size total_workers = ngpus_per_node batch_size = int(total_batch_size / ngpus_per_node) workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node) model = NeuralNetwork() device = torch.device("npu") train_sampler = torch.utils.data.distributed.DistributedSampler(training_data) test_sampler = torch.utils.data.distributed.DistributedSampler(test_data) train_loader = torch.utils.data.DataLoader( training_data, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True) val_loader = torch.utils.data.DataLoader( test_data, batch_size=batch_size, shuffle=(test_sampler is None), num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True) loc = 'npu:{}'.format(gpu) model = model.to(loc) criterion = nn.CrossEntropyLoss().to(loc) optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) for epoch in range(start_epoch, end_epoch): print("curr epoch: ", epoch) train_sampler.set_epoch(epoch) train(train_loader, model, criterion, optimizer, epoch, gpu) ...... # train函数代码同shell脚本方式
def main_worker(gpu, ngpus_per_node, args): start_epoch = 0 end_epoch = 5 args.gpu = args.local_rank # 任务拉起后,local_rank自动获得device号 ddp_setup(args.gpu, args.world_size) ...... # 其余代码同shell脚本方式
在模型脚本中,根据拉起方式不同,需要传入不同的参数,传参配置逻辑如下(此处使用argparse逻辑):
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)') parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.') args = parser.parse_args() world_size = torch.npu.device_count() args.world_size = world_size main(args.world_size, args.batch_size)
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)') args = parser.parse_args() world_size = torch.npu.device_count() args.world_size = world_size main(args.world_size, args.batch_size)
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)') parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.') parser.add_argument("--local_rank", default=-1, type=int) # local_rank用于自动获取device号。使用mp.spawn方式与shell方式启动时需删除此项 args = parser.parse_args() world_size = torch.npu.device_count() args.world_size = world_size main(args.world_size, args.batch_size, args) # 需将Python拉起命令中设置的参数传入main函数
我们给出了每种方式的拉起命令示例,用户可根据实际情况自行更改。
export HCCL_WHITELIST_DISABLE=1 RANK_ID_START=0 WORLD_SIZE=8 for((RANK_ID=$RANK_ID_START;RANK_ID<$((WORLD_SIZE+RANK_ID_START));RANK_ID++)); do echo "Device ID: $RANK_ID" export LOCAL_RANK=$RANK_ID python3 ddp_test_shell.py & done wait
export HCCL_WHITELIST_DISABLE=1 python3 ddp_test_spwn.py
1 2 3 | # master_addr和master_port参数需用户根据实际情况设置 export HCCL_WHITELIST_DISABLE=1 python3 -m torch.distributed.launch --nproc_per_node 8 --master_addr localhost --master_port *** ddp_test.py |
export HCCL_WHITELIST_DISABLE=1 torchrun --standalone --nnodes=1 --nproc_per_node=8 ddp_test_shell.py
export HCCL_WHITELIST_DISABLE=1 export MASTER_IP_ADDR=** # 将**填写node_rank0的IP地址 export MASTER_PORT=** # 将**填写为一个空闲的tcp端口号 torch_npu_run --rdzv_backend=parallel --master_addr=$MASTER_IP_ADDR \ --master_port=$MASTER_PORT --nnodes=8 --nproc_per_node=8 ddp_test_shell.py
当屏幕打印/定向日志中出现模型加载、训练等正常运行日志时,说明拉起多卡训练成功,如图1所示。
启动命令需要在集群每台机器分别执行:
# 第一台机器 RANK_ID_START=0 NPU_PER_NODE=8 for((RANK_ID=$RANK_ID_START;RANK_ID<$((NPU_PER_NODE+RANK_ID_START));RANK_ID++)); do echo "Device ID: $RANK_ID" export LOCAL_RANK=$RANK_ID python3 ddp_test_shell.py & done wait # 第二台机器 RANK_ID_START=8 NPU_PER_NODE=8 for((RANK_ID=$RANK_ID_START;RANK_ID<$((NPU_PER_NODE+RANK_ID_START));RANK_ID++)); do echo "Device ID: $RANK_ID" export LOCAL_RANK=$RANK_ID python3 ddp_test_shell.py & done wait
1 2 3 4 5 6 | # 第一台机器 # master_addr和master_port参数需用户根据实际情况设置 python3 -m torch.distributed.launch --nnodes=2 --nproc_per_node 8 --node_rank 0 --master_addr $master_addr --master_port $master_port ddp_test.py # 第二台机器 python3 -m torch.distributed.launch --nnodes=2 --nproc_per_node 8 --node_rank 1 --master_addr $master_addr --master_port $master_port ddp_test.py |
# 第一台机器 torchrun --nnodes=2 --nproc_per_node=8 --node_rank 0 ddp_test_shell.py # 第二台机器 torchrun --nnodes=2 --nproc_per_node=8 --node_rank 1 ddp_test_shell.py
# 第一台机器 export MASTER_IP_ADDR=** # 将**填写node_rank0的IP地址 export MASTER_PORT=** # 将**填写为一个空闲的tcp端口号 torch_npu_run --rdzv_backend=parallel --master_addr=$MASTER_IP_ADDR \ --master_port=$MASTER_PORT --nnodes=2 --node_rank 0 --nproc_per_node=8 ddp_test_shell.py # 第二台机器 export MASTER_IP_ADDR=** # 将**填写node_rank0的IP地址 export MASTER_PORT=** # 将**填写为一个空闲的tcp端口号 torch_npu_run --rdzv_backend=parallel --master_addr=$MASTER_IP_ADDR \ --master_port=$MASTER_PORT --nnodes=2 --node_rank 1 --nproc_per_node=8 ddp_test_shell.py
当屏幕打印/定向日志中出现模型加载、训练等正常运行日志时,说明拉起双机训练成功。