昇腾社区首页
中文
注册

通过注解构图并执行

通过注解构图并执行的流程如下所示。

图1 通过注解构图并执行

示例代码如下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import dataflow as df
import numpy as np

@df.pyflow
def add(in0, in1):
    return in0 + in1

# 系统初始化
options = {
    "ge.exec.deviceId":"0",
    "ge.experiment.data_flow_deploy_info_path":"./data_flow_deploy_info.json",
    "ge.socVersion": "AscendXXX"  # 根据环境修改version
}
df.init(options)

# 定义FlowData
data0 = df.FlowData()
data1 = df.FlowData()
# 通过注解UDF生成FlowNode
add_node = add.fnode()

# 构建FlowData和FlowNode的连边关系
add_out = add_node(data0, data1)

# 通过FlowOut构建FlowGraph
dag = df.FlowGraph([add_out])

# 根据图自动生成部署策略,所有节点均部署在numa_config中的第1个设备上
# 如需修改需要注释下面这行代码,并修改./data_flow_deploy_info.json文件内容
df.utils.generate_deploy_template(dag, "./data_flow_deploy_info.json")

# 调用FlowGraph.feed填充输入
dag.feed({data0:np.array([[1, 2]], dtype=np.int32), data1:np.array([[2, 3]], dtype=np.int32)})

# 调用FlowGraph.fetch获取输出
print("dataflow fetch result:", dag.fetch())
# 打印:dataflow fetch result: ([array([[3, 5]], dtype=int32)], 0)

# 释放系统资源 
df.finalize()