Torchrun方式模型脚本示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | import argparse import os from datetime import timedelta import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data import DataLoader from torch.utils.data.distributed import DistributedSampler import torch_npu from torchvision import datasets, transforms DATA_DIR = "./data" # 数据集目录 def cleanup(): """清理分布式训练的进程组""" dist.destroy_process_group() class ToyModel(nn.Module): """简单的神经网络模型""" def __init__(self): super(ToyModel, self).__init__() self.layer1 = nn.Linear(784, 128) # 第一层全连接层 self.relu = nn.ReLU() # ReLU 激活函数 self.layer2 = nn.Linear(128, 10) # 第二层全连接层 def forward(self, x): """前向传播函数""" return self.layer2(self.relu(self.layer1(x))) # 输入经过两层全连接层和一个 ReLU 激活函数 def data_process(inputs, labels): """数据预处理函数""" inputs = inputs.view(-1, 784) # 将输入张量展平为 (batch_size, 784) labels = labels.view(-1) # 将标签张量展平为 (batch_size) return inputs, labels def get_train_args(): """获取训练参数""" parser = argparse.ArgumentParser(description="command line arguments") # 创建参数解析器 parser.add_argument("--batch_size", type=int, default=64) # 设置批大小,默认为 64 parser.add_argument("--epochs", type=int, default=1) # 设置训练轮数,默认为 1 parser.add_argument("--learning_rate", type=float, default=0.0001) # 设置学习率,默认为 0.0001 args = parser.parse_args() # 解析命令行参数 return args def train(args): """训练函数""" world_size = int(os.environ["WORLD_SIZE"]) # 获取分布式训练的进程数 local_rank_idx = int(os.environ["LOCAL_RANK"]) # 获取本地进程索引 devices_per_node = torch.npu.device_count() # 获取每个节点的 NPU 设备数量 dist.init_process_group("hccl", rank=local_rank_idx, world_size=world_size, timeout=timedelta(minutes=30)) # 初始化分布式进程组 torch_npu.npu.set_device(local_rank_idx) # 设置当前使用的 NPU 设备 device_id = f"npu:{local_rank_idx}" # NPU 设备标识 model = ToyModel().to(device_id) # 实例化模型并移动到指定设备 transform = transforms.Compose([ transforms.ToTensor(), # 将图像转换为张量 transforms.Normalize((0.5,), (0.5,)) # 归一化 ]) train_dataset = datasets.MNIST(root=DATA_DIR, train=True, download=True, transform=transform) # 加载 MNIST 训练数据集 train_sampler = DistributedSampler(train_dataset, num_replicas=devices_per_node, rank=local_rank_idx) # 创建分布式数据采样器 train_dataloader = DataLoader(train_dataset, batch_size=args.batch_size, sampler=train_sampler) # 创建数据加载器 criterion = nn.CrossEntropyLoss().to(device_id) # 定义损失函数 optimizer = optim.Adam(model.parameters(), lr=args.learning_rate) # 定义优化器 model = DDP(model) # 包装模型以支持分布式训练 for epoch in range(args.epochs): if local_rank_idx == 0: print(f"\nCurrent epoch: {epoch}") # 打印当前轮数 train_sampler.set_epoch(epoch) # 设置数据采样器的轮数 model.train() # 设置模型为训练模式 for i, (inputs, labels) in enumerate(train_dataloader): inputs, labels = data_process(inputs, labels) # 预处理数据 inputs, labels = inputs.to(device_id), labels.to(device_id) # 将数据移动到指定设备 outputs = model(inputs) # 模型前向传播 loss = criterion(outputs, labels).to(device_id) # 计算损失 optimizer.zero_grad() # 清除梯度 loss.backward() # 反向传播 optimizer.step() # 更新参数 if local_rank_idx == 0: print(f"Current step: {i}, loss: {loss.item()}") # 打印当前步数和损失值 cleanup() # 清理分布式训练的进程组 def main(): """主函数""" args = get_train_args() # 获取训练参数 train(args) # 开始训练 if __name__ == "__main__": main() |
修改后:
train(local_rank, world_size, args):
world_size = int(os.environ["WORLD_SIZE"])
修改后:
local_rank_idx = local_rank
def main(): args = get_train_args() world_size = int(os.environ["WORLD_SIZE"]) # 新增 local_rank = int(os.environ["LOCAL_RANK"]) # 新增 train(local_rank, world_size, args) # 修改
#!/bin/bash # 设置环境变量 export MASTER_ADDR="localhost" export MASTER_PORT="12345" export WORLD_SIZE=8 # 总的NPU数量 # 启动多个进程 for ((local_rank=0; local_rank<$WORLD_SIZE; local_rank++)) do export LOCAL_RANK=$local_rank python train_8p_shell.py & done wait
bash start_train_8p.sh # 根据实际命名进行修改
import torch.multiprocessing as mp
修改后:
train(local_rank, world_size, args):
world_size = int(os.environ["WORLD_SIZE"])
修改后:
local_rank_idx = local_rank
def main(): args = get_train_args() world_size = torch.npu.device_count() # 新增 mp.spawn(train, args=(world_size, args), nprocs=world_size) # 新增
export MASTER_ADDR=xxxx # 将xxxx改为本机IP地址 export MASTER_PORT=12345 python train_8p_spawn.py
python -m torch.distributed.launch --nproc_per_node 8 --master_addr localhost --master_port 12345 train_8p_torch_distributed_launch.py
torchrun --nproc_per_node 8 --master_addr localhost --master_port 12345 train_8p_torchrun.py
parser.add_argument("--master_addr", type=str, default="xxxx") # 新增,将xxxx改为本机IP地址 parser.add_argument("--master_port", type=str, default="12345") # 新增
dist.init_process_group("hccl", rank=local_rank_idx, world_size=world_size, timeout=timedelta(minutes=30))
修改后:
dist.init_process_group("hccl", init_method=f"parallel://{args.master_addr}:{args.master_port}", rank=local_rank_idx, world_size=world_size, timeout=timedelta(minutes=30))
torch_npu_run --rdzv_backend parallel --master_addr xxxx --master_port 12345 --nproc_per_node 8 train_8p_torch_npu_run.py # 将xxxx改为本机IP地址
#!/bin/bash # 设置环境变量 export MASTER_ADDR="xxxx" # 将xxxx改为主节点IP地址 export MASTER_PORT="12345" export WORLD_SIZE=16 export NODE_RANK=0 # 启动多个进程 for ((local_rank=0; local_rank<8; local_rank++)) do export RANK=$((NODE_RANK * 8 + local_rank)) export LOCAL_RANK=$local_rank python train_16p_shell.py & done wait
#!/bin/bash # 设置环境变量 export MASTER_ADDR="xxxx" # 将xxxx改为主节点IP地址 export MASTER_PORT="12345" export WORLD_SIZE=16 export NODE_RANK=1 # 启动多个进程 for ((local_rank=0; local_rank<8; local_rank++)) do export RANK=$((NODE_RANK * 8 + local_rank)) export LOCAL_RANK=$local_rank python train_16p_shell.py & done wait
bash start_train_16p_master_node.sh # 根据实际命名进行修改
bash start_train_16p_slave_node.sh # 根据实际命名进行修改
import torch.multiprocessing as mp
parser.add_argument("--node_rank", type=int, default=0) # 新增 parser.add_argument("--nnodes", type=int, default=-1) # 新增
修改后:
train(local_rank, world_size, args):
world_size = int(os.environ["WORLD_SIZE"])
修改后:
local_rank_idx = local_rank
global_rank_idx = args.node_rank * torch.npu.device_count() + local_rank
dist.init_process_group("hccl", rank=local_rank_idx, world_size=world_size, timeout=timedelta(minutes=30))
修改后:
dist.init_process_group("hccl", rank=global_rank_idx, world_size=world_size, timeout=timedelta(minutes=30))
def main(): args = get_train_args() world_size = torch.npu.device_count() * args.nnodes # 新增 mp.spawn(train, args=(world_size, args), nprocs=torch.npu.device_count()) # 新增
export MASTER_ADDR=xxxx # 将xxxx改为主节点IP地址 export MASTER_PORT=12345 python train_16p_spawn.py --nnodes 2 --node_rank 0
export MASTER_ADDR=xxxx # 将xxxx改为主节点IP地址 export MASTER_PORT=12345 python train_16p_spawn.py --nnodes 2 --node_rank 1
parser.add_argument("--local-rank", type=int, default=0) # 新增
local_rank_idx = int(os.environ["LOCAL_RANK"])
修改后:
local_rank_idx = args.local_rank
global_rank_idx = int(os.environ["RANK"])
dist.init_process_group("hccl", rank=local_rank_idx, world_size=world_size, timeout=timedelta(minutes=30))
修改后:
dist.init_process_group("hccl", rank=global_rank_idx, world_size=world_size, timeout=timedelta(minutes=30))
# 将xxxx改为主节点IP地址 python -m torch.distributed.launch --nnodes 2 --nproc_per_node 8 --node_rank 0 --master_addr xxxx --master_port 12345 train_16p_python.py
# 将xxxx改为主节点IP地址 python -m torch.distributed.launch --nnodes 2 --nproc_per_node 8 --node_rank 1 --master_addr xxxx --master_port 12345 train_16p_python.py
# 将xxxx改为主节点IP地址 torchrun --nnodes 2 --nproc_per_node 8 --node_rank 0 --master_addr xxxx --master_port 12345 train_16p_torchrun.py
# 将xxxx改为主节点IP地址 torchrun --nnodes 2 --nproc_per_node 8 --node_rank 1 --master_addr xxxx --master_port 12345 train_16p_torchrun.py
parser.add_argument("--master_addr", type=str, default=None) # 新增,将xxxx改为主节点IP地址 parser.add_argument("--master_port", type=str, default="12345") # 新增
global_rank_idx = int(os.environ["RANK"])
ist.init_process_group("hccl", rank=local_rank_idx, world_size=world_size, timeout=timedelta(minutes=30))
修改后:
dist.init_process_group("hccl", init_method=f"parallel://{args.master_addr}:{args.master_port}", rank=global_rank_idx, world_size=world_size, timeout=timedelta(minutes=30))
# 将xxxx改为主节点IP地址 torch_npu_run --rdzv_backend parallel --master_addr xxxx --master_port 12345 --nnodes 2 --node_rank 0 --nproc_per_node 8 train_16p_torch_npu_run.py
# 将xxxx改为主节点IP地址 torch_npu_run --rdzv_backend parallel --master_addr xxxx --master_port 12345 --nnodes 2 --node_rank 1 --nproc_per_node 8 train_16p_torch_npu_run.py
当屏幕打印/定向日志中出现模型加载、训练等正常运行日志时,说明拉起训练成功。