什么是FlowData和FlowNode
什么是FlowNode
FlowNode是FlowGraph的计算节点。FlowNode定义了计算节点的名称、输入个数、输出个数,FlowNode的实际计算逻辑由AddPp方法添加的ProcessPoint决定,ProcessPoint可以是FuncProcessPoint,也可以是GraphProcessPoint。详细信息请参考dataflow.FlowNode。
- GraphProcessPoint
GraphProcessPoint的函数原型包括如下。
1
GraphProcessPoint(framework, graph_file, load_params={}, compile_config_path="", name=None)
以TF框架下编写Add功能为例,介绍如何构建一个GraphProcessPoint。如下示例中,add有两个输入,GraphProcessPoint和TF的IR文件图的映射是通过GraphProcessPoint的构造方法里的graph_file参数传递的,add_graph_config.json用于配置TF的IR文件图的编译信息,add.pb是TF的图文件,Placeholder和Placeholder_1是输入节点的名称,这三个文件的名字为示例,具体请根据实际情况修改。1
pp1 = df.GraphProcessPoint(df.Framework.TENSORFLOW, "./add.pb", load_params={"input_data_names":"Placeholder,Placeholder_1"},compile_config_path='config/add_graph_config.json')
- FuncProcessPoint(方式一)
该方式适用于Dataflow(Python)调用UDF(Python)和UDF(C++)的场景,并且需要存在已经创建好的UDF工程。
FuncProcessPoint的主要函数原型包括如下。1 2
FuncProcessPoint(compile_config_path, name=None) # FuncProcessPoint构造函数,并可以用于FuncProcessPoint和UDF的映射,compile_config_path是配置了UDF编译信息的json文件,name是FuncProcessPoint的名字(可以为空,也可以不携带)。 set_init_param(attr_name, attr_value) # 设置FuncProcessPoint的初始化参数。
以Add功能为例,介绍如何构建一个FuncProcessPoint。如下示例中,FuncProcessPoint的名称叫"func_pp",FuncProcessPoint的功能通过UDF实现,FuncProcessPoint和UDF的映射通过add_func_config.json来实现。1 2 3
# 定义FuncProcessPoint实现Add功能的FlowNode pp0 = df.FuncProcessPoint(compile_config_path="config/add_func_config.json", name="func_pp") pp0.set_init_param("out_type", df.DT_INT32) # 按UDF实际实现来设置
- FuncProcessPoint(方式二)
该方式适用于Dataflow(Python)调用UDF(Python)场景,并且需要存在已经写好的UDF实现函数。不需要用户自己创建UDF工程。
FuncProcessPoint的主要函数原型包括如下。1 2
FuncProcessPoint(py_func=UserFunc, workspace_dir="./py_user_func_ws", name=None) # FuncProcessPoint构造函数,py_func是用户开发的指定了输入输出的UDF类名,workspace_dir是运行时生成的Python UDF工作空间目录,name是FuncProcessPoint的名字(可以为空,也可以不携带)。 set_init_param(attr_name, attr_value) # 设置FuncProcessPoint的初始化参数。
以Add功能为例,介绍如何构建一个FuncProcessPoint。如下示例中,FuncProcessPoint的名称叫"func_pp",FuncProcessPoint的功能通过UDF实现,通过py_func指定UDF实现的功能。1 2 3
# 定义FuncProcessPoint实现Add功能的FlowNode pp0 = df.FuncProcessPoint(py_func="Add", workspace_dir="./py_add_ws", name="func_pp") pp0.set_init_param("out_type", df.DT_INT32) # 按UDF实际实现来设置
父主题: 构建FlowGraph