通过注解构图并执行
通过注解构图并执行的流程如下所示。
图1 通过注解构图并执行


示例代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | import dataflow as df import numpy as np @df.pyflow def add(in0, in1): return in0 + in1 # 系统初始化 options = { "ge.exec.deviceId":"0", "ge.experiment.data_flow_deploy_info_path":"./data_flow_deploy_info.json", "ge.socVersion": "AscendXXX" # 根据环境修改version } df.init(options) # 定义FlowData data0 = df.FlowData() data1 = df.FlowData() # 通过注解UDF生成FlowNode add_node = add.fnode() # 构建FlowData和FlowNode的连边关系 add_out = add_node(data0, data1) # 通过FlowOut构建FlowGraph dag = df.FlowGraph([add_out]) # 根据图自动生成部署策略,所有节点均部署在numa_config中的第1个设备上 # 如需修改需要注释下面这行代码,并修改./data_flow_deploy_info.json文件内容 df.utils.generate_deploy_template(dag, "./data_flow_deploy_info.json") # 调用FlowGraph.feed填充输入 dag.feed({data0:np.array([[1, 2]], dtype=np.int32), data1:np.array([[2, 3]], dtype=np.int32)}) # 调用FlowGraph.fetch获取输出 print("dataflow fetch result:", dag.fetch()) # 打印:dataflow fetch result: ([array([[3, 5]], dtype=int32)], 0) # 释放系统资源 df.finalize() |
父主题: DataFlow运行