通过工程创建UDF
开发流程
通过工程进行UDF开发的流程如下图所示:
UDF工程创建
UDF工程分成如下几类,请根据使用场景进行创建。
- 通过UDF实现用户自定义功能:该场景下,用户不调用第三方依赖库,不调用已经实现的NN功能,完全通过编写UDF相关文件实现自定义功能。
- 通过调用NN实现自定义功能:该场景下,用户需要的功能可以通过已有的NN功能承载,用户通过UDF调用NN功能即可。
通过工具创建工程,详见UDF Python工程创建工具。
自定义函数开发
UDF Python开发过程中进行自定义处理函数的编写。本小节介绍进行UDF的开发过程。该部分代码可以通过模板自动生成。
UDF的实现包括两部分:
- UDF实现文件:用户自定义函数功能的代码实现,文件后缀为.py。按场景分为UDF实现文件(通过UDF实现自定义功能)和通过调用NN实现自定义功能。
- UDF封装及注册文件:用户基于工具生成或按照规范自定义cpp文件。用于获取自定义的Python UDF,将其封装成C++函数,并调用FLOW_FUNC_REGISTRAR完成函数的注册。

- 单P场景下,支持一次输入对应多次输出,或者一次输入对应一次输出,或者多次输入对应一次输出。
- 2P场景下,仅支持一次输入对应一次输出。
- 如下以实现add功能的函数为例,介绍如何进行UDF开发、编译、构图及验证。
UDF实现文件(通过UDF实现自定义功能)
用户需要在工程的“workspace/src_python/xxx.py”文件中进行用户自定义函数的开发。以“func_add.py”为例进行介绍。
函数分析:
该函数实现Add功能,把两个输入转换为numpy数组并进行加法运算,将运算结果进行输出。
明确输入和输出:
Add函数有两个输入,一个输出。
注意事项:
用户在Python文件中定义的函数名称要和cpp文件中获取Python模块的名称保持一致。
如用户在Python文件中定义初始化函数为init_flow_func,在cpp的Init函数中查找Python模块属性时应查找init_flow_func。详细实例可参照如下代码。
函数实现:
包括初始化函数和实现函数(可以有多个)。
用户在编写UDF代码时,通常需要导入dataflow.data_type、dataflow.flow_func模块。
flow_func中开放了UDF编写的常用接口,data_type封装了常用的数据类型。具体模块内容用户可以导入后自行查看。
1 2 3 4 5 | import dataflow.flow_func as ff import dataflow.data_type as dt # 定义类 class Add(): # 类名称和实际开发UDF功能相关 |
- 初始化函数,执行初始化动作。示例如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@ff.init_wrapper() # 该描述是必须的,wrapper中将C++的对象和Python开放的对象进行了转换 def init_flow_func(self, meta_params): # 入参有且只能有meta_params,类型为MetaParams name = meta_params.get_name() ff.logger.info("func name is %s", name) input_num= meta_params.get_input_num() ff.logger.info("input num %d", input_num) device_id = meta_params.get_running_device_id() ff.logger.info("device id %d", device_id) out_type = meta_params.get_attr_tensor_dtype("out_type") if out_type[0] != ff.FLOW_FUNC_SUCCESS: ff.logger.error("get dtype failed") return ff.FLOW_FUNC_FAILED self.count = 0 return ff.FLOW_FUNC_SUCCESS
- 实现函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
@ff.proc_wrapper() # 该描述是必须的,wrapper中将C++的对象和Python开放的对象进行了转换 def Add1(self, run_context, input_flow_msgs): # 入参有且只能有MetaRunContext类型对象及FlowMsg类型对象 for msg in input_flow_msgs: ff.logger.error("msg code %d", msg.get_ret_code()) if msg.get_ret_code() != ff.FLOW_FUNC_SUCCESS: ff.logger.error("invalid input") return ff.FLOW_FUNC_FAILED # add方法下应该只有两个输入 if input_flow_msgs.__len__() != 2: ff.logger.error("invalid input") # 返回flow_func模块中定义的错误码 return ff.FLOW_FUNC_ERR_PARAM_INVALID tensor1 = input_flow_msgs[0].get_tensor() tensor2 = input_flow_msgs[1].get_tensor() dtype1 = tensor1.get_data_type() dtype2 = tensor2.get_data_type() # 两个输入的datatype预期应该相同 if dtype1 != dtype2: ff.logger.error("input data type is not same") return ff.FLOW_FUNC_FAILED ff.logger.info("element size %d", tensor1.get_data_size()) ff.logger.event("data type is same") shape1 = tensor1.get_shape() shape2 = tensor2.get_shape() if shape1 != shape2: ff.logger.error("input data shape is not same") return ff.FLOW_FUNC_FAILED ff.logger.info("shape is same") # 申请输出的msg对象 out = run_context.alloc_tensor_msg(shape1, dt.DT_INT32) data_size = out.get_tensor().get_data_size() ff.logger.error("data_size %d", data_size) ele_cnt = out.get_tensor().get_element_cnt() ff.logger.error("ele_cnt %d", ele_cnt) np1 = tensor1.numpy() np2 = tensor2.numpy() a = out.get_tensor().numpy() # 通过获取tensor,进一步获取tensor中的数组数据,进行加法运算 a[...] = np1 + np2 ff.logger.event("prepare to set output in add1") if run_context.set_output(0, out) != ff.FLOW_FUNC_SUCCESS: ff.logger.error("set output failed") return ff.FLOW_FUNC_FAILED self.count += 1 return ff.FLOW_FUNC_SUCCESS
- 如果用户需要定义多个功能,可以在同一个类中定义多个实现函数。
UDF实现文件(通过调用NN实现自定义功能)
用户需要在工程的“workspace/src_python/xxx.py”文件中进行用户自定义函数的开发。
函数分析:
该函数实现Add功能。
明确输入和输出:
包含两个输入,一个输出。
函数实现:
包括初始化函数和实现函数(可以有多个)。
用户在编写UDF代码时,通常需要导入dataflow.data_type、dataflow.flow_func模块
flow_func中开放了UDF编写的常用接口,data_type封装了常用的数据类型。具体模块内容用户可以导入后自行查看。
1 2 3 4 5 | import dataflow.flow_func as ff import dataflow.data_type as dt # 定义类 class Add(): # 类名称和实际开发UDF功能相关 |
- 初始化函数,执行初始化动作。示例如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@ff.init_wrapper() # 该描述是必须的,wrapper中将C++的对象和Python开放的对象进行了转换 def init_flow_func(self, meta_params): # 入参有且只能有meta_params,类型为MetaParams name = meta_params.get_name() ff.logger.info("func name is %s", name) input_num= meta_params.get_input_num() ff.logger.info("input num %d", input_num) device_id = meta_params.get_running_device_id() ff.logger.info("device id %d", device_id) out_type = meta_params.get_attr_tensor_dtype("out_type") if out_type[0] != ff.FLOW_FUNC_SUCCESS: ff.logger.error("get dtype failed") return ff.FLOW_FUNC_FAILED self.count = 0 return ff.FLOW_FUNC_SUCCESS
- Add():用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@ff.proc_wrapper() # 该描述是必须的,wrapper中将C++的对象和Python开放的对象进行了转换 def AddNN(self, run_context, input_flow_msgs): # 入参有且只能有MetaRunContext类型对象及FlowMsg类型对象 ret = run_context.run_flow_model("invoke_graph", input_flow_msgs, 1000) if ret[0] != ff.FLOW_FUNC_SUCCESS: ff.logger.error("run nn failed") return ff.FLOW_FUNC_FAILED ff.logger.event("run nn success") i = 0 for out in ret[1]: if run_context.set_output(i, out) != ff.FLOW_FUNC_SUCCESS: ff.logger.error("set output failed") return ff.FLOW_FUNC_FAILED i = i + 1 return ff.FLOW_FUNC_SUCCESS
- 如果用户需要定义多个功能,可以在同一个类中定义多个实现函数。
UDF封装及注册
Python开发UDF需要准备Python文件以外,还需要准备C++的接口调用文件。建议通过工具直接生成,也可以自定义,示例如下。
C++的类要继承MetaMultiFunc,C++文件中核心内容包括三部分
Init函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | int32_t Init(const std::shared_ptr<MetaParams> ¶ms) override { FLOW_FUNC_LOG_DEBUG("Init enter"); PyAcquire(); ScopeGuard gilGuard([this]() { PyRelease(); }); int32_t result = FLOW_FUNC_SUCCESS; try { PyRun_SimpleString("import sys"); std::string append = std::string("sys.path.append('") + params->GetWorkPath() + "')"; PyRun_SimpleString(append.c_str()); constexpr const char *pyModuleName = "func_add"; // 与Python UDF的文件名保持一致,如上述例子中的func_add constexpr const char *pyClzName = "Add"; FLOW_FUNC_LOG_INFO("Load py module name: %s", pyModuleName); auto module = py::module_::import(pyModuleName); FLOW_FUNC_LOG_INFO("%s.%s import success", pyModuleName, pyClzName); pyModule_ = module.attr(pyClzName)(); if (CheckProcExists() != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR("%s.%s check proc exists failed", pyModuleName, pyClzName); return FLOW_FUNC_FAILED; } if (py::hasattr(pyModule_, "init_flow_func")) { // 与Python UDF中定义的初始化函数名称保持一致,如上述例子中的init_flow_func result = pyModule_.attr("init_flow_func")(params).cast<int32_t>(); if (result != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR("%s.%s init_flow_func result=%d", pyModuleName, pyClzName, result); return result; } FLOW_FUNC_LOG_INFO("%s.%s init_flow_func success", pyModuleName, pyClzName); } else { FLOW_FUNC_LOG_INFO("%s.%s has no init_flow_func method, no need init", pyModuleName, pyClzName); } } catch (std::exception &e) { FLOW_FUNC_LOG_ERROR("init failed: %s", e.what()); result = FLOW_FUNC_FAILED; } FLOW_FUNC_LOG_DEBUG("FlowFunc Init end."); return result; } |
proc()函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | int32_t AddProc1( const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs) { FLOW_FUNC_LOG_INFO("enter"); PyAcquire(); ScopeGuard gilGuard([this]() { PyRelease(); }); int32_t result = FLOW_FUNC_SUCCESS; try { result = pyModule_.attr("Add1")(runContext, inputFlowMsgs).cast<int32_t>(); // 此处attr的name与Python UDF定义的处理函数一致,如上述例子中的Add1 Add2或Sub1 AddNN if (result != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR(".Add Add return %d", result); return result; } FLOW_FUNC_LOG_INFO("call Add result: %d", result); } catch (std::exception &e) { FLOW_FUNC_LOG_ERROR("proc failed: %s", e.what()); result = FLOW_FUNC_FAILED; } return result; } // 如果用户在同一个UDF Python类中编写了多个处理函数,这里可以通过多个C++函数调用Python不同attr的方式,实现多FUNC int32_t AddProc2( const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs) |
通过FLOW_FUNC_REGISTRAR宏将实现类声明为func name,注册到UDF框架中。
1 2 3 | FLOW_FUNC_REGISTRAR(Add) .RegProcFunc("Add1", &Add::AddProc1); .RegProcFunc("Add2", &Add::AddProc2); |
UDF工程编译
在udf_py_ws_sample目录下创建build和release目录:
编译完成后在workspace/release目录下生成编译成功的so。
- 创建build目录。
mkdir -p build
- 创建release目录。
mkdir -p release
- 进入build目录。
cd build
- 工程编译
- 非交叉编译场景(开发环境和运行环境的架构一致)下示例如下:编译x86_64类型的so,指定target lib为libadd.so,编译工具为g++。
cmake .. -DTOOLCHAIN=g++ -DRELEASE_DIR=../release -DRESOURCE_TYPE=X86 -DUDF_TARGET_LIB=add && make
- -DTOOLCHAIN:编译工具名称。比如g++。
- -DRELEASE_DIR:编译完成后,目标文件存放路径。
- -DRESOURCE_TYPE:编译的资源类型,可以取值:X86或者Aarch或者Ascend。
- X86:目标so需要部署在X86架构的Host时,配置为X86。
- Aarch:目标so需要部署在Aarch的Host时,配置为Aarch。
- Ascend:目标so需要部署在Device时,配置为Ascend。
- -DUDF_TARGET_LIB:编译完成后,目标so的名称。比如,设置为add,实际生成libadd.so。
- 交叉编译(开发环境是x86_64,运行环境是aarch64)场景下,编译Ascend类型的so, 指定target lib为libadd.so,编译工具为${INSTALL_DIR}/toolkit/toolchain/hcc/bin/aarch64-target-linux-gnu-g++。${INSTALL_DIR}请替换为CANN软件安装后文件存储路径。若安装的Ascend-cann-toolkit软件包,以root安装举例,则安装后文件存储路径为:/usr/local/Ascend/ascend-toolkit/latest。示例如下:
cmake .. -DTOOLCHAIN=${INSTALL_DIR}/toolkit/toolchain/hcc/bin/aarch64-target-linux-gnu-g++ -DRELEASE_DIR=../release -DRESOURCE_TYPE=Ascend -DUDF_TARGET_LIB=add && make
- 非交叉编译场景(开发环境和运行环境的架构一致)下示例如下:编译x86_64类型的so,指定target lib为libadd.so,编译工具为g++。
构建FlowGraph
构造完UDF后,通过FlowNode和FuncProcessPoint构建FlowGraph图,示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | # 定义输入 data0 = df.FlowData() data1 = df.FlowData() # 定义FuncProcessPoint实现Add功能的FlowNode pp0 = df.FuncProcessPoint(compile_config_path="./add_func.json") flow_node0 = df.FlowNode(input_num=2, output_num=1) flow_node0.add_process_point(pp0) # 构建连边关系 flow_node0_out = flow_node0(data0, data1) # 构建FlowGraph dag = df.FlowGraph([flow_node0_out]) |