下文中模型示例代码可能与实际版本存在差异,请以实际版本代码为准。
使用的PyTorch版本为1.11版本。
root@ubuntu:/data/atlas_dls/public/dataset/resnet50/imagenet# pwd
/data/atlas_dls/public/dataset/resnet50/imagenet
root@ubuntu:/data/atlas_dls/public/dataset/resnet50/imagenet# du -sh
11G
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-rescheduling/withoutRanktable/pytorch/resnet50”目录中的train_start.sh文件,在训练代码中创建“scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/public/code/ResNet50_ID4149_for_PyTorch/scripts/# scripts/ └── train_start.sh
root@ubuntu:/data/atlas_dls/public/code/ResNet50_ID4149_for_PyTorch/scripts/# scripts/ ├── rank_table.sh ├── utils.sh └── train_start.sh
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-tolerance/without-ranktable/pytorch/resnet50”目录中的“train_start.sh”文件,在训练代码中创建“scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/public/code/ResNet50_ID4149_for_PyTorch/scripts/# scripts/ └── train_start.sh
root@ubuntu:/data/atlas_dls/public/code/ResNet50_ID4149_for_PyTorch/scripts/# scripts/ ├── rank_table.sh ├── utils.sh ├── train_start.sh ├── ...
... # env for breakpoint ckpt export RESUME_MODE_ENABLE=1 export HCCL_ASYNC_ERROR_HANDLING=1 # 开启watchdog功能,默认取值为0,表示不开启watchdog,取值为1表示开启watchdog
def main(): args = parser.parse_args() os.environ['MASTER_ADDR'] = args.addr #删除此行代码 os.environ['MASTER_PORT'] = '29501' #删除此行代码 os.environ['MASTER_ADDR'] = os.getenv("MASTER_ADDR") #新增此行代码 os.environ['MASTER_PORT'] = os.getenv("MASTER_PORT") #新增此行代码 if os.getenv('ALLOW_FP32', False) and os.getenv('ALLOW_HF32', False): raise RuntimeError('ALLOW_FP32 and ALLOW_HF32 cannot be set at the same time!') elif os.getenv('ALLOW_HF32', False): torch.npu.conv.allow_hf32 = True elif os.getenv('ALLOW_FP32', False): torch.npu.conv.allow_hf32 = False torch.npu.matmul.allow_hf32 = False
用户在实际使用时,需要在脚本中增加ckpt完整性校验及加载异常判断,否则可能会出现加载报错并退出或者加载不完整的情况。
import argparse
# 新增以下加粗字段
import glob
import os
...
if torch.npu.is_available():
...
args.ngpus_per_node = ngpus_per_node
# 新增以下加粗字段
args.gpu = int(os.getenv("LOCAL_RANK"))
...
if args.multiprocessing_distributed:
# 删除以下加粗字段
args.world_size = ngpus_per_node * args.world_size
# 新增以下加粗字段
args.world_size = int(os.getenv("WORLD_SIZE"))
...
if args.multiprocessing_distributed:
# 删除以下加粗字段
args.rank = args.rank * ngpus_per_node + gpu
# 新增以下加粗字段
args.rank = int(os.getenv("RANK"))
...
if args.resume:
# 将原来的args.resume的相关字段修改为以下加粗字段
candidate_ckpt_path = ""
for p in glob.glob(f"./rank*"):
best_ckpt_path = os.path.join(p, "model_best.pth.tar")
if os.path.exists(best_ckpt_path):
candidate_ckpt_path = best_ckpt_path
break
if candidate_ckpt_path:
print("[gpu id:", args.gpu, "]", "=> loading checkpoint '{}'".format(candidate_ckpt_path))
# Map model to be loaded to specified single gpu.
loc = 'npu:{}'.format(args.gpu)
checkpoint = torch.load(candidate_ckpt_path, map_location=loc)
print(f"load checkpoint to : {loc}")
args.start_epoch = checkpoint['epoch']
best_acc1 = checkpoint['best_acc1']
model.load_state_dict(checkpoint['state_dict'])
optimizer.load_state_dict(checkpoint['optimizer'])
print("[gpu id:", args.gpu, "]", "=> loaded checkpoint '{}' (epoch {})".format(candidate_ckpt_path, checkpoint['epoch']))
else:
print("no valid ckpt found to resume.")
...
if not args.multiprocessing_distributed or (args.multiprocessing_distributed and args.rank % ngpus_per_node == 0):
# 将原来的的相关字段修改为以下加粗字段
save_path = f"./rank_{args.rank}"
if not os.path.exists(save_path):
os.makedirs(save_path, exist_ok=True)
save_checkpoint({
'epoch': epoch + 1,
'arch': args.arch,
'state_dict': model.state_dict(),
'best_acc1': best_acc1,
'optimizer': optimizer.state_dict(),
}, is_best, save_path=save_path)
...
...
# 修改原有save_checkpoint函数
def save_checkpoint(state, is_best, filename='checkpoint.pth.tar', save_path="./"):
if is_best:
target_path = os.path.join(save_path, 'model_best.pth.tar')
torch.save(state, target_path)
print(f"save ckpt to {target_path} done. Best epoch for now is :{state['epoch']}")
... DISTRIBUTED_ARGS="--nproc_per_node $GPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT --max_restarts 5 --monitor_interval 10 " ...
参数说明:
mkdir /data/atlas_dls/code
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-rescheduling/withoutRanktable/mindspore/resnet50”目录中的train_start.sh文件,结合训练代码中“resnet/scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/code/resnet/scripts/# scripts/ ... ├── run_distribute_train.sh ├── run_distribute_train_gpu.sh └── train_start.sh
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-rescheduling/withRanktable/mindspore/resnet50”目录中的train_start.sh和main.sh文件,结合训练代码中“resnet/scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/code/resnet/scripts/# scripts/ ├── main.sh ... ├── run_distribute_train.sh ├── run_distribute_train_gpu.sh └── train_start.sh
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-tolerance/without-ranktable/mindspore/resnet50”目录中的train_start.sh和reset_process.py文件,结合训练代码中“resnet/scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/code/resnet/scripts/# scripts/ ├── reset_process.py ... ├── run_distribute_train.sh ├── run_distribute_train_gpu.sh └── train_start.sh
root@ubuntu:/data/atlas_dls/code/resnet/scripts/# scripts/ ├── main.sh ├── reset_process.py ... ├── run_distribute_train.sh ├── run_distribute_train_gpu.sh └── train_start.sh
根据实际情况进行修改,全局配置参数:数据集路径,配置参数文件路径;其他模型适配,请根据实际情况增删参数。 dataset_path=/job/data/imagenet_full/train config_yaml_path=/job/code/resnet/resnet50_imagenet2012_config.yaml
# main.sh: 针对本示例(Resnet50模型),用户不需要再修改此脚本;其他模型适配,请根据实际情况,增、删或修改环境变量配置,然后修改训练启动脚本路径和对应的参数,即main.sh脚本中Python命令调用的部分。 # 本例中,单机单卡的Python命令如下: python ${ROOT_PATH}/../train.py --data_path=${DATA_PATH} --config_path=${CONFIG_PATH} # 本例中,单机多卡和分布式的命令如下: python ${ROOT_PATH}/../train.py --run_distribute=True --device_num=${RANK_SIZE} --data_path=${DATA_PATH} --config_path=${CONFIG_PATH}
... run_distribute: False enable_profiling: False data_path: "/cache/data" output_dir: "/job/code/output" # 修改checkpoint保存路径,请用户根据实际情况进行修改 load_path: "/cache/checkpoint_path/" device_target: "Ascend" checkpoint_path: "./checkpoint/" checkpoint_file_path: "" ... net_name: "resnet50" dataset: "imagenet2012" device_num: 1 pre_trained: "/job/code/output/resnet50/imagenet2012/ckpt" # 容器内预训练模型加载路径(支持目录和文件),支持在指定路径下对.ckpt文件进行模糊查找,将搜寻最新的.ckpt文件进行加载,请用户参考训练yaml根据实际情况进行修改 run_eval: False eval_dataset_path: "" parameter_server: False filter_weight: False save_best_ckpt: True eval_start_epoch: 40 ... network_dataset: "resnet50_imagenet2012" # 再训练选项 save_graphs: False # 是否开启图编译结果保存 save_graphs_path: "./graphs" # 图编译结果保存路径 has_trained_epoch: 0 # 模型预训练的epoch,默认是0 has_trained_step: 0 # 模型预训练的step,默认是0 --- # 每项配置的帮助说明 enable_modelarts: "Whether training on modelarts, default: False" ... batch_size: "Batch size for training and evaluation" epoch_size: "Total training epochs." checkpoint_path: "The location of the checkpoint file." checkpoint_file_path: "The location of the checkpoint file." save_graphs: "Whether save graphs during training, default: False." save_graphs_path: "Path to save graphs."
... # 模型保存代码 if config.save_checkpoint: ckpt_append_info = [{"epoch_num": 0, "step_num": 0}] config_ck = CheckpointConfig(save_checkpoint_steps=config.save_checkpoint_epochs * step_size, keep_checkpoint_max=config.keep_checkpoint_max, append_info=ckpt_append_info) ckpt_cb = ModelCheckpoint(prefix=config.net_name, directory=config.save_ckpt_dir+"_"+str(config.rank_id), config=config_ck) cb += [ckpt_cb] ...
用户在实际使用时,需要在脚本中增加ckpt完整性校验及加载异常判断,否则可能会出现加载报错并退出或者加载不完整的情况。
... def init_weight(net, cfg): """init_weight""" if cfg.pre_trained: if not os.path.isfile(cfg.pre_trained): cfg.logger.warning("There is not ckpt file: %s", cfg.pre_trained) else: param_dict = ms.load_checkpoint(cfg.pre_trained) if param_dict.get('epoch_num') is None: raise ValueError('Can not find epoch_num in ckptfile') if cfg.filter_weight: filter_list = [x.name for x in net.end_point.get_parameters()] filter_checkpoint_parameter_by_list(param_dict, filter_list) ms.load_param_into_net(net, param_dict) cfg.start_epoch = int(param_dict.get('epoch_num').asnumpy().item()) cfg.logger.info("Pre trained ckpt mode: %s loading", cfg.pre_trained) ...
import glob ... # 找寻pre_trained目录下最新的*.ckpt文件 def _find_latest_ckpt(): ckpt_files = glob.glob(config.pre_trained+"*/*.ckpt") if ckpt_files: ckpt_files.sort(key=os.path.getmtime, reverse=True) return ckpt_files # 尝试加载ckpt文件,尝试次数为INIT_WEIGHT_MAX_ATTEMPTS次 def _try_to_init_weight(net, config): if os.path.isfile(config.pre_trained): latest_ckpt = [config.pre_trained] else: latest_ckpt = _find_latest_ckpt() if not latest_ckpt: config.logger.warning("There is not ckpt file: %s", config.pre_trained) return init_weight_attempts = 0 INIT_WEIGHT_MAX_ATTEMPTS = 5 while(latest_ckpt and init_weight_attempts < INIT_WEIGHT_MAX_ATTEMPTS): try: config.pre_trained = latest_ckpt[0] init_weight(net, config) break except Exception: config.logger.warning("Pre trained ckpt %s format is incorrect, try to load the last most recent ckpt", config.pre_trained) if latest_ckpt[1:]: latest_ckpt = latest_ckpt[1:] init_weight_attempts+=1 continue else: config.logger.error("no more ckpt to load", config.pre_trained) raise ValueError("ckpt format is incorrect, no more ckpt to load, load ckpt failed.") ... @moxing_wrapper() def train_net(): """train net""" target = config.device_target set_parameter() set_output_dir(config) config.logger = get_logger(config.log_dir, config.rank_id, config.parameter_server) dataset = create_dataset(dataset_path=config.data_path, do_train=True, batch_size=config.batch_size, train_image_size=config.train_image_size, eval_image_size=config.eval_image_size, target=target, distribute=config.run_distribute) step_size = dataset.get_dataset_size() net = resnet(class_num=config.class_num) if config.parameter_server: net.set_param_ps() # 替换原有的init_weigh函数,使用_try_to_init_weight尝试加载ckpt文件,避免加载到不完整的ckpt,导致训练报错 _try_to_init_weight(net, config) if config.resume_ckpt: resume_param = ms.load_checkpoint(config.resume_ckpt, choice_func=lambda x: not x.startswith(('learning_rate', 'global_step'))) config.start_epoch = int(resume_param.get('epoch_num', ms.Tensor(0, ms.int32)).asnumpy().item()) lr = ms.Tensor(init_lr(step_size=step_size)) ...
参数 |
说明 |
---|---|
-h |
打印帮助信息,输出可选参数。 |
-m |
运行的模式,默认值为“common”。 取值如下:
|
-f |
表示支持的框架。当前只支持MindSpore,其他值无效。 |
-p |
被管理的训练进程的进程号。 |
-r |
指定训练进程的运行模式。仅同时使用HCCL Controller和MindSpore时需要该参数。 |
mkdir /data/atlas_dls/code
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-rescheduling/withoutRanktable/mindspore/pangu_alpha”目录中的train_start.sh和main.sh文件,在训练代码中创建“scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/code/pangu_alpha/scripts/# scripts/
├── main.sh ├── run_cluster_export.sh ├── run_distribute_eval_gpu.sh ├── run_distribute_eval.sh ... ├── run_distribute_train.sh ├── run_standalone_eval.sh ├── run_standalone_export.sh ├── run_standalone_predict.sh └── train_start.sh
root@ubuntu:/data/atlas_dls/code/pangu_alpha/scripts/# scripts/ ├── main.sh ├── run_cluster_export.sh ├── run_distribute_eval_gpu.sh ├── run_distribute_eval.sh ... ├── run_distribute_train.sh ├── run_standalone_eval.sh ├── run_standalone_export.sh ├── run_standalone_predict.sh └── train_start.sh
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-tolerance/without-ranktable/mindspore/pangu_alpha”目录中的train_start.sh、main.sh文件和reset_process.py文件,在训练代码中创建“scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/code/pangu_alpha/scripts/# scripts/
├── main.sh ├── reset_process.py ├── run_cluster_export.sh ├── run_distribute_eval_gpu.sh ├── run_distribute_eval.sh ... ├── run_distribute_train.sh ├── run_standalone_eval.sh ├── run_standalone_export.sh ├── run_standalone_predict.sh └── train_start.sh
root@ubuntu:/data/atlas_dls/code/pangu_alpha/scripts/# scripts/ ├── main.sh ├── reset_process.py ├── run_cluster_export.sh ├── run_distribute_eval_gpu.sh ├── run_distribute_eval.sh ... ├── run_distribute_train.sh ├── run_standalone_eval.sh ├── run_standalone_export.sh ├── run_standalone_predict.sh └── train_start.sh
... # 训练数据集路径,根据实际情况修改 # 安全提示,涉及对路径和输入参数的校验 dataset="/job/data/train_data" # 设置训练环境变量 set_env # 单节点训练场景 if [[ "$server_count" == "1" ]]; then server_id=0 if [ ${device_count} -lt 8 ]; then echo "Less than 8 card training is not supported for pangu alpha model." | tee log fi if [ ${device_count} -eq 8 ]; then bash main.sh ${device_count} ${server_count} ${RANK_TABLE_FILE} ${server_id} ${dataset} fi # 分布式训练场景 else server_id=$(get_server_id) if [ $? -eq 1 ];then echo "get server id failed." exit 1 fi echo "server id is: "${server_id} bash main.sh ${device_count} ${server_count} ${RANK_TABLE_FILE} ${server_id} ${dataset}
def set_parse_200B(args_opt): """ Set config for 200B mode """ args_opt.embedding_size = 16384 args_opt.num_layers = 32 # 模型层次 args_opt.num_heads = 128 if args_opt.per_batch_size == 0: args_opt.per_batch_size = 1 args_opt.word_emb_dp = 0 if args_opt.run_type == "train": args_opt.start_lr = 6e-5 args_opt.end_lr = 6e-6 args_opt.stage_num = 8 # 流水线阶段的数量 args_opt.micro_size = 16 # 流水线并行模式下的微批次大小,其取值应大于args_opt.stage_num args_opt.op_level_model_parallel_num = 16 if args_opt.optimizer_shard = 1: args_opt.op_level_model_parallel_num = 8 elif args_opt.run_type == "predict": args_opt.stage_num = 4 args_opt.micro_size = 1 args_opt.op_level_model_parallel_num = 16 if args_opt.optimizer_shard == 1: args_opt.op_level_model_parallel_num = 8
... # 保存checkpoint的代码调用 add_checkpoint_callback_policy(args_opt, callback, rank) ... # 保存checkpoint代码定义 def add_checkpoint_callback_policy(args_param, callback, rank_id): r""" Add checkpoint policy to callback. """ # 安全提示,涉及对路径和输入参数的校验 if args_param.save_checkpoint: # checkpoint保存epoch_num和step_num info信息 ckpt_append_info = [{"epoch_num": args_param.has_trained_epoches, "step_num": args_param.has_trained_steps}] ckpt_config = CheckpointConfig(save_checkpoint_steps=args_param.save_checkpoint_steps, keep_checkpoint_max=args_param.keep_checkpoint_max, integrated_save=False, append_info=ckpt_append_info ) ckpoint_cb = ModelCheckpoint(prefix=args_param.ckpt_name_prefix + str(rank_id), directory=os.path.join(args_param.save_checkpoint_path, f"rank_{rank_id}"), config=ckpt_config) callback.append(ckpoint_cb) ...
... # 如果运行的模型没有开启pipeline并行,则修改以下函数 def set_parallel_context(args_opt): # 如果运行的模型开启pipeline并行,则修改以下函数 # 安全提示,涉及对路径和输入参数的校验 def set_pipeline_parallel_context(args_opt): ... # 在mindspore.set_auto_parallel_context前添加以下代码,请参考MindSpore文档分布式并行接口说明中set_auto_parallel_context参数的使用说明 # 断点续训中增加内容 if not os.path.exists(args_opt.strategy_load_ckpt_path): args_opt.strategy_load_ckpt_path = "" # 断点续训增加内容,strategy_ckpt_save_file_path参数可以根据容器内路径指定 strategy_ckpt_save_file_path = '/job/data/code/fault_torlence/pangu_alpha/strategy/strategy.ckpt' if args_opt.strategy_load_ckpt_path == strategy_ckpt_save_file_path: strategy_ckpt_save_file_path = '/job/data/code/fault_torlence/pangu_alpha/strategy/strategy_new.ckpt' # 将strategy_ckpt_save_file='strategy.ckpt'修改成strategy_ckpt_save_file=strategy_ckpt_save_file_path,如果set_auto_parallel_context里没有指定strategy_ckpt_save_file参数,则需要手动添加strategy_ckpt_save_file=strategy_ckpt_save_file_path,如下粗体所示 mindspore.set_auto_parallel_context( parallel_mode=args_opt.parallel_mode, gradients_mean=False, search_mode=args_opt.search_mode, full_batch=bool(args_opt.full_batch), loss_repeated_mean=True, device_num=device_num, enable_parallel_optimizer=bool(args_opt.optimizer_shard), pipeline_stages=args_opt.stage_num, enable_alltoall=bool(args_opt.enable_alltoall), strategy_ckpt_save_file=strategy_ckpt_save_file_path) ... # checkpoint加载代码定义 # 安全提示,涉及对路径和输入参数的校验 def restore_checkpoint(args_param, sink_size, dataset, model, network, epoch): r""" Load checkpoint process. """ print("======start single checkpoint", flush=True) ckpt_name = args_param.ckpt_name_prefix # 为了文档简洁易读, 此处省略了命令行参数save_checkpoint_path和ckpt_name的校验, 请用户自行添加相关校验 ckpt_pattern = os.path.join(args_param.save_checkpoint_path, "rank_{}".format(D.get_rank()), f"{ckpt_name}*.ckpt") ckpt_all_files = glob.glob(ckpt_pattern) if not ckpt_all_files: print(f"There is no ckpt file in {args_param.save_checkpoint_path}, " f"current ckpt_files found is {ckpt_files} " f"with pattern {ckpt_pattern}, so skip the loading.") return ckpt_exp_pattern = os.path.join( args_param.save_checkpoint_path, "rank_{}".format(D.get_rank()), f"{ckpt_name}*_breakpoint.ckpt", ) ckpt_exp_files = glob.glob(ckpt_exp_pattern) ckpt_files = [] for file in ckpt_all_files: if file not in ckpt_exp_files: ckpt_files.append(file) if not ckpt_files: print( f"There is no ckpt file in {args_param.save_checkpoint_path}, " f"current ckpt_files found is {ckpt_files} " f"with pattern {ckpt_pattern}, so skip the loading." ) return ckpt_files.sort(key=os.path.getmtime, reverse=True) time_stamp = datetime.datetime.now() print(f"time stamp {time_stamp.strftime('%Y.%m.%d-%H:%M:%S')} pre trained ckpt model {ckpt_files} loading", flush=True) # 加载checkpoint最新文件 print(f'Start to load from {ckpt_files[0]}') param_dict = load_checkpoint(ckpt_files[0]) if param_dict.get("epoch_num") and param_dict.get("step_num"): args_param.has_trained_epoches = int(param_dict["epoch_num"].data.asnumpy()) args_param.has_trained_steps = int(param_dict["step_num"].data.asnumpy()) model.build(train_dataset=dataset, sink_size=sink_size, epoch=epoch) load_param_into_net(network, param_dict) ...
... opt.add_argument("--vocab_size", type=int, default=50304, # 根据训练数据集进行修改,此处已修改为样例数据集的取值 help="vocabulary size, default is 40000.") ... opt.add_argument("--data_column_name", type=str, default="input_ids", # 默认值为input_ids,用户需要根据数据集定义的字段进行修改 help="Column name of datasets") ... parser.add_argument("--strategy_load_ckpt_path", type=str, default="/job/data/code/fault_torlence/pangu_alpha/strategy/strategy.ckpt", # 断点续训中,根据用户习惯指定容器内路径,且路径不会被训练覆盖 help="The training prallel strategy for the model.") parser.add_argument("--tokenizer_path", type=str, default="./tokenizer_path", help="The path where stores vocab and vocab model file") ... def add_retrain_params(opt): """ Add parameters about retrain. """ opt.add_argument("--pre_trained", type=str, default="/job/data/code/fault_torlence/pangu_alpha/8p", # 指定预训练模型路径 help="Pretrained checkpoint path.") opt.add_argument("--save_checkpoint_path", type=str, default="/job/data/code/fault_torlence/pangu_alpha/8p", # 指定模型保存路径 help="Save checkpoint path.") opt.add_argument("--keep_checkpoint_max", # 指定模型保存策略:最大数量 type=int, default=1, help="Max checkpoint save number.") opt.add_argument("--save_checkpoint_steps", # 指定模型保存策略:保存间隔 type=int, default=20, help="Save checkpoint step number.") opt.add_argument("--save_checkpoint", # 指定当次训练是否保存模型 type=ast.literal_eval, default=True, # 默认为False,需要修改为True help="Whether save checkpoint in local disk.") opt.add_argument("--ckpt_name_prefix", # 指定模型保存策略:文件名前缀 type=str, default="pangu", help="Saving checkpoint name prefix.") ...
root@ubuntu:/data/atlas_dls/code/pangu_alpha/# pangu_alpha/ ├── README.md ├── README_CN.md ├── group_info_env ... ├── scripts ├── serving_increment ├── src ├── tasks.py └── train.py
... # env variable prepare group_info_file = os.getenv("GROUP_INFO_FILE") if group_info_file: with open(os.path.expanduser("/job/code/group_info_env"), "a") as outfile: outfile.write(f"export GROUP_INFO_FILE_REFLECT={group_info_file}\n") ...
参数 |
说明 |
---|---|
-h |
打印帮助信息,输出可选参数。 |
-m |
运行的模式,默认值为“common”。 取值如下:
|
-f |
表示支持的框架。当前只支持MindSpore,其他值无效。 |
-p |
被管理的训练进程的进程号。 |
-r |
指定训练进程的运行模式。仅同时使用HCCL Controller和MindSpore时需要该参数。 |
TensorFlow框架当前仅支持重调度模式的Job级别重调度,暂不支持Pod级别重调度和优雅容错模式。
root@ubuntu:/data/atlas_dls/public/dataset/resnet50/imagenet_TF# pwd
/data/atlas_dls/public/dataset/resnet50/imagenet_TF
root@ubuntu:/data/atlas_dls/public/dataset/resnet50/imagenet_TF# du -sh
42G
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-rescheduling/withoutRanktable/tensorflow”目录中的train_start.sh文件,在训练代码中创建“scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/public/code/ResNet50_for_TensorFlow_2.6_code/scripts/# scripts/ └── train_start.sh
/data/atlas_dls/public/code/ResNet50_for_TensorFlow_2.6_code/ ├── scripts │ ├── train_start.sh │ ├── utils.sh │ ├── rank_table.sh │ ...
# 单节点训练场景 if [[ "${server_count}" -eq 1 ]]; then device_id=0 if [ "${device_count}" -eq 1 ]; then get_env_for_1p_job if [ "${framework}" == "PyTorch" ]; then ${DLS_PROGRAM_EXECUTOR} ${boot_file_path}${boot_file} --gpu=${device_id} ${train_param} --model_dir=${output_url}/models/device 2>&1 && tee ${output_url}/log check_return_code else ${DLS_PROGRAM_EXECUTOR} ${boot_file_path}${boot_file} ${train_param} --model_dir=${output_url}/models/device 2>&1 && tee ${output_url}/log check_return_code fi chmod 440 ${output_url} exit ${ret_code} fi fi
用户在实际使用时,需要在脚本中增加ckpt完整性校验及加载异常判断,否则可能会出现加载报错并退出或者加载不完整的情况。
class Controller(object): """Class that facilitates training and evaluation of models.""" def __init__( ... # Restore Model if needed. if self.checkpoint_manager is not None: model_restored = self._restore_model() logging.info("loading checkpoint %s", model_restored) if not model_restored and self.checkpoint_manager.checkpoint_interval: # If the model is not restored from a checkpoint, save an initial # checkpoint. ckpt_path = self.checkpoint_manager.save( checkpoint_number=self.global_step) logging.info("Saved checkpoins in %s", ckpt_path) # Create and initialize the interval triggers. self.eval_trigger = utils.IntervalTrigger(self.eval_interval, self.eval_offset)
git clone https://github.com/NVIDIA/Megatron-LM.git cd Megatron-LM git checkout 285068c8108e0e8e6538f54fe27c3ee86c5217a2
git clone https://gitee.com/ascend/Megatron-LM.git megatron_npu cd megatron_npu pip install -e .
cp tests_gpt/pretrain_gpt.py <path_to_Megatron-LM>/
root@ubuntu:/data/atlas_dls/public/dataset/GPT-3/enwiki# pwd
/data/atlas_dls/public/dataset/GPT-3/enwiki
root@ubuntu:/data/atlas_dls/public/dataset/GPT-3/enwiki# du -sh
90G
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-rescheduling/withoutRanktable/pytorch/gpt-3”目录中的“train_start.sh”文件,在训练代码中创建“scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/scripts# scripts/ └── train_start.sh
root@ubuntu:/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/scripts# scripts/ ├── rank_table.sh ├── utils.sh └── train_start.sh
进入“MindXDL-deploy”仓库,获取“samples/train/resumable-training/fault-tolerance/without-ranktable/pytorch/gpt-3”目录中的train_start.sh文件,在训练代码中创建“scripts”目录,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/scripts# scripts/ └── train_start.sh
root@ubuntu:/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/scripts# scripts/ ├── rank_table.sh ├── utils.sh └── train_start.sh
... # env for breakpoint ckpt export RESUME_MODE_ENABLE=1 export HCCL_ASYNC_ERROR_HANDLING=1 # 开启watchdog功能,默认取值为0,表示不开启watchdog,取值为1表示开启watchdog
... DISTRIBUTED_ARGS="--nproc_per_node $GPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT --max_restarts 5 --monitor_interval 10 " ...
参数说明:
... from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP # 新增以下加粗字段 import os import mindio_ttp.framework_ttp as ttp import mindio_ttp.megatron_ttp as mega from megatron import get_args ... ... def setup_model_and_optimizer(model_provider_func, model_type, no_wd_decay_cond=None, scale_lr_cond=None, lr_mult=1.0): ... if args.load is not None: timers = get_timers() timers('load-checkpoint', log_level=0).start(barrier=True) #删除以下加粗字段 args.iteration = load_checkpoint(model, optimizer, opt_param_scheduler) #新增以下加粗字段 args.iteration = mega.load_checkpoint(model, optimizer, opt_param_scheduler) timers('load-checkpoint').stop(barrier=True) timers.log(['load-checkpoint']) ... ... def train_step(forward_step_func, data_iterator, model, optimizer, opt_param_scheduler): """Single training step.""" ... # Reduce gradients. optimizer.reduce_model_grads(args, timers) # all rank need to barrier #新增以下加粗字段 torch.distributed.barrier() ... timers('optimizer').stop() ... #新增以下加粗字段 @ttp.ttp_to_persist def train(forward_step_func, model, optimizer, opt_param_scheduler, train_data_iterator, valid_data_iterator, process_non_loss_data_func): """Train the model function.""" ... # Tracking loss. total_loss_dict = {} # begin to init ttp processor & controller #新增以下加粗字段 cur_rank = torch.distributed.get_rank() world_size: int = torch.distributed.get_world_size() masterIp = os.getenv('MASTER_ADDR') port = int(os.getenv('TTP_PORT')) dp_ranks = mpu.get_data_parallel_all_rank() if cur_rank == 0: controller_ip = os.getenv('XDL_IP') ttp.ttp_init_controller(cur_rank, world_size, len(dp_ranks)//2, False) ttp.ttp_start_controller(controller_ip, port) ttp.ttp_init_processor(cur_rank, dp_ranks, len(dp_ranks), world_size, len(dp_ranks)//2, False) ttp.ttp_start_processor(masterIp, port) ttp.ttp_register_save_ckpt_handler(mega.ttp_save_checkpoint) ttp.ttp_register_rename_handler(mega.rename_save_result) ... iteration += 1 #新增以下加粗字段 ttp.ttp_set_ckpt_args((iteration, model, optimizer, opt_param_scheduler)) args.consumed_train_samples += mpu.get_data_parallel_world_size() * \ args.micro_batch_size * \ get_num_microbatches() ... ...
... from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors # 新增如下加粗字段 import mindio_ttp.framework_ttp as ttp timers('optimizer-inner-step', log_level=1).start( barrier=args.barrier_with_L1_time) # 新增如下加粗字段 ttp.ttp_start_updating_os(-1) self.optimizer.step() # 新增如下加粗字段 if hasattr(self, 'increase_step'): self.increase_step() ttp.ttp_end_updating_os(self.current_step)
... from apex.optimizers import FusedAdam as Adam from apex.optimizers import FusedSGD as SGD # 新增以下加粗字段 from mindio_ttp.megatron_ttp import ReplicatedOptimizer from megatron import get_args ... # Megatron optimizer. # 删除以下加粗字段 opt_ty = DistributedOptimizer # 新增以下加粗字段 opt_ty = ReplicatedOptimizer \ if args.use_distributed_optimizer else \ Float16OptimizerWithFloat16Params ...
... # Memory buffers to avoid dynamic memory allocation _GLOBAL_MEMORY_BUFFER = None all_data_parallel_group_ranks = None ... global _DATA_PARALLEL_GLOBAL_RANKS global all_data_parallel_group_ranks assert _DATA_PARALLEL_GROUP is None, 'data parallel group is already initialized' all_data_parallel_group_ranks = [] ... def get_data_parallel_src_rank(): """Calculate the global rank corresponding to the first local rank in the data parallel group.""" assert _DATA_PARALLEL_GLOBAL_RANKS is not None, \ "Data parallel group is not initialized" return _DATA_PARALLEL_GLOBAL_RANKS[0] def get_data_parallel_all_rank(): assert _DATA_PARALLEL_GLOBAL_RANKS is not None, \ "Data parallel group is not initialized" return _DATA_PARALLEL_GLOBAL_RANKS def get_all_data_parallel_ranks(): return all_data_parallel_group_ranks ...
bash 镜像名称 which torchrun
回显示例如下:
/usr/local/python3.8.3/bin/torchrun
... import sys import mindio_ttp.framework_ttp from torch.distributed.run import main ...
... # env for breakpoint ckpt export RESUME_MODE_ENABLE=1 #新增以下加粗字段 export TTP_OT=360 ... # 分布式场景 if [[ "${device_count}" -ge 1 ]]; then server_id=${RANK} logger "server id is: ""${server_id}" DISTRIBUTED_ARGS="--nproc_per_node $LOCAL_WORLD_SIZE --nnodes $server_count --node_rank $RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT" # 删除以下加粗代码 ${DLS_PROGRAM_EXECUTOR} -m torch.distributed.launch $DISTRIBUTED_ARGS ${boot_file_path}${boot_file} ${train_param} 2>&1 | tee ${output_url}/log # 新增以下加粗代码 torchrun $DISTRIBUTED_ARGS ${boot_file_path}${boot_file} ${train_param} && tee ${output_url}/log check_return_code fi ...
其他模型使用优雅容错模式的适配说明如下。
如果使用PyTorch框架,则不需要获取reset_process.py文件。
模型启动脚本文件修改示例如下。
... # Launch Command 在模型原本的启动命令后,新增&,使得训练主进程可以在后台执行 ${DLS_PROGRAM_EXECUTOR} ${run_file_path}${boot_file} ${train_param} --run_distribute=True --device_num=${MS_LOCAL_WORKER} --parameter_server=False --device_target=Ascend --output_dir=${output_url} &> ${output_url}/worker_$i.log & tail -f ${output_url}/log & old_log_pid=$! # 将以下命令复制在启动脚本里 python -u reset_process.py -p "${train_pid}" & #将训练进程的pid传给监测进程 reset_pid=$! wait ${train_pid} #等待训练进程结束获取其退出码 exit_code=$? if [ ${exit_code} -eq 0 ]; then kill -15 ${reset_pid} #如果训练进程正常退出则向监测进程发送-15信号 echo "training finished." exit ${exit_code} else if [ -d "训练进程工作目录"]; then touch "训练进程工作目录"/newlog # 创建新训练进程的日志 tail -f "训练进程工作目录" & # 查看新训练进程的日志 fi kill -9 ${old_log_pid} wait ${reset_pid} # 如果训练进程异常退出则等待监测进程结束 exit_code=$? fi exit ${exit_code}