对于复杂模型推理,可在脚本中手动指定每个算子的执行stream,将原本需要串行的多个算子分发到不同stream做并行计算,多个stream上的计算形成overlap,从而降低整体计算耗时。
对于并行来说,包含如下两种:
本功能主要处理计算资源间并发,尤其针对Cube计算资源未完全使用的场景。若Cube计算资源已完全使用,不建议开启本功能,可能会造成额外的调度,从而导致原计算性能劣化。
export ENABLE_DYNAMIC_SHAPE_MULTI_STREAM=1
使用如下with语句块(npu_stream_switch),语句块内下发的算子切换至stream_tag流,语句块外的算子使用默认stream计算。
with torchair.scope.npu_stream_switch(stream_tag: str, stream_priority: int = 0):
通过npu_wait_tensor接口实现时序控制,指定算子a等待算子b执行完后执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | import torch, os import torch_npu import torchair as tng from torchair.configs.compiler_config import CompilerConfig from torchair.core.utils import logger import logging logger.setLevel(logging.DEBUG) # 定义模型model # add_result、mm1在默认stream,mm_result在流“1”,add2在流“2” class Model(torch.nn.Module): def __init__(self): super().__init__() def forward(self, in1, in2, in3, in4): add_result = torch.add(in1, in2) with tng.scope.npu_stream_switch('1'): # torch.mm算子(mm_result)等待torch.add算子(add_result)执行完再执行 tng.scope.npu_wait_tensor(in4, add_result) mm_result = torch.mm(in3, in4) mm1 = torch.mm(in3, in4) with tng.scope.npu_stream_switch('2'): # torch.add算子(add2)等待torch.mm算子(mm_result)执行完再执行 tng.scope.npu_wait_tensor(in4, mm_result) add2 = torch.add(in3, in4) return add_result, mm_result, mm1, add2 model = Model() config = CompilerConfig() config.debug.graph_dump.type = "pbtxt" npu_backend = tng.get_npu_backend(compiler_config=config) # 使用torchair的backend去调用compile接口编译模型 model = torch.compile(model, backend=npu_backend, dynamic=False, fullgraph=True) in1 = torch.randn(1000, 1000, dtype = torch.float16).npu() in2 = torch.randn(1000, 1000, dtype = torch.float16).npu() in3 = torch.randn(1000, 1000, dtype = torch.float16).npu() in4 = torch.randn(1000, 1000, dtype = torch.float16).npu() result = model(in1, in2, in3, in4) print(f"Result:\n{result}\n") |