sysctl -w net.ipv4.ip_local_reserved_ports=60000-60015
vim /etc/sysctl.conf
sysctl -p
本节以适配样例(DDP单NPU单进程场景)章节的代码为样例,介绍将单机单卡训练脚本修改为单机多卡训练脚本的核心步骤。
local_rank = int(os.environ.get("LOCAL_RANK", 0))
device = torch.device(f'npu:{local_rank}')
torch.distributed.init_process_group(backend="hccl",rank=(args.node_rank)*(args.nproc_per_node) + local_rank)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
train_sampler = torch.utils.data.distributed.DistributedSampler(train_data)
train_dataloader = DataLoader(dataset = train_data, batch_size=batch_size, sampler = train_sampler)
有5种脚本启动方式可拉起多卡训练:
附录拉起多卡训练脚本示例中,以一个简单模型脚本为样例,展示了每种拉起方式脚本代码的修改方法以及各种拉起方式的适配方法,用户可以参考学习。
以torchrun方式启动为例,通过一个简单的自定义模型,展示单机八卡的模型代码和启动脚本样例。
# 模型代码 import os import argparse import tempfile import torch import torch_npu import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from torchvision import datasets, transforms from torch.utils.data.distributed import DistributedSampler # DDP from torch.nn.parallel import DistributedDataParallel as DDP def cleanup(): dist.destroy_process_group() class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.net1 = nn.Linear(10, 10) self.relu = nn.ReLU() self.net2 = nn.Linear(10, 5) def forward(self, x): return self.net2(self.relu(self.net1(x))) def parse_args(): # 配置传参逻辑 parser = argparse.ArgumentParser(description="command line arguments") parser.add_argument('--batch_size', type=int, default=64) parser.add_argument('--epochs', type=int, default=10) parser.add_argument('--learning_rate', type=float, default=0.0001) parser.add_argument("--node_rank", type=int) parser.add_argument("--nproc_per_node", type=int) return parser.parse_args() def data_process(inputs, labels): squeezed_tensor = inputs.squeeze(0).squeeze(0) inputs = squeezed_tensor[:, :10] labels = labels.repeat(28, 5) * (1/140) return inputs, labels def main(): # 获取分布式超参数 args = parse_args() # 添加分布式逻辑 local_rank = int(os.environ.get("LOCAL_RANK", 0)) world_size = int(os.environ.get("WORLD_SIZE", 1)) # 初始化DDP,将通信方式设置为hccl dist.init_process_group("hccl", rank=(args.node_rank)*(args.nproc_per_node) + local_rank, world_size=world_size) transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,)) ]) # 将模型转移到NPU上 torch_npu.npu.set_device(local_rank) #获取device号 device = torch.device(f'npu:{local_rank}') model = ToyModel().to(device) # 定义模型后,开启DDP模式 model = DDP(model, device_ids=[local_rank], find_unused_parameters=True) train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform = transform) # 在获取训练数据集后,设置train_sampler和train_loader train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=local_rank) train_loader = DataLoader(train_dataset, batch_size=args.batch_size, sampler=train_sampler) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=args.learning_rate) step = 0 for epoch in range(args.epochs): model.train() for inputs, labels in train_loader: # 数据预处理,将数据集的数据转成需要的shape inputs, labels = data_process(inputs, labels) # 将数据转到NPU处理 inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) # 将loss转到NPU处理 loss = criterion(outputs, labels).to(device) loss.backward() optimizer.step() print(f"step == {step}") step += 1 cleanup() if __name__ == "__main__": main()
# 启动脚本 NPUS_PER_NODE=8 NNODES=1 NODE_RANK=0 MASTER_ADDR=localhost # 本机地址,不用修改 MASTER_PORT=6004 # 端口号选一个未被占用的端口即可 export NPU_ASD_ENABLE=1 # 特征值检测 source ./test/env_npu.sh # env_npu.sh配置的环境变量 DISTRIBUTED_ARGS=" --nproc_per_node $NPUS_PER_NODE \ --nnodes $NNODES \ --node_rank $NODE_RANK \ --master_addr $MASTER_ADDR \ --master_port $MASTER_PORT " torchrun $DISTRIBUTED_ARGS train_8p.py \ # train_8p.py为上述模型脚本配置示例代码名称,用户可自行定义 --nproc_per_node $NPUS_PER_NODE # 每个设备的卡数 --node_rank $NODE_RANK # 本设备是第几个设备(若单机则为0) --learning_rate 0.0001 # 学习率 --batch_size 1 # 训练批次大小 --epochs 1 # 训练迭代轮数