Development Process (Calling the MetaFlowFunc Class)
Overview
During UDF development, you can call the MetaFlowFunc class to compile the user-defined single-func processing function. You can also call the MetaMultiFunc class to compile the user-defined multi-func processing function. This section describes how to call the MetaFlowFunc class to develop the UDF.
The implementation of UDF consists of two parts:
- UDF implementation file: code implementation of user-defined functions. The file name extension is .cpp. There are two scenarios: UDF Implementation File (by Using UDF) and UDF Implementation File (by Calling NN).
- UDF registration: Use the REGISTER_FLOW_FUNC macro to declare the implementation class as a function name and register it to the UDF framework.
- The scenario of using a single device allows one input with one or multiple outputs, or multiple inputs with one output.
- The scenario of using two devices allows one input with one output only.
- The following uses the Add function as an example, to describe how to develop, compile, construct, and verify UDF.
UDF Implementation File (by Using UDF)
You need to develop user-defined functions in the workspace/xx.cpp file of the project. The following uses add_flow_func.cpp as an example.
Function analysis:
The Add function implements addition for the float type and int type.
Specifying the input and output:
The function contains two inputs and one output.
Function implementation:
Inherit the MetaFlowFunc base class of the meta_flow_func.h file, and rewrite the Init() and Proc() functions.
If some resources need to be released, they need to be processed in the destructor.
1 2 3 |
namespace FlowFunc { class AddFlowFunc: public MetaFlowFunc{}; } |
If you need to use the concurrency function of Task Parallel Runtime (TPRT), see Development Process (Calling the MetaMultiFunc Class) for the implementation of the following Init() and Proc() functions.
- Init(): performs initialization, such as initializing variables and obtaining attributes. In this example, what needs to be obtained is the out_type attribute set in FunctionPp during DataFlow graph construction. For example:
1 2 3 4 5 6 7 8 9 10 11
int32_t Init() override { // Use the GetAttr method of the MetaContext class to obtain the out_type attribute set in the operator graph construction. The attribute value is stored in the private variable outDataType_ of the class and is used to convert the source type to the outDataType_ type. auto getRet = context_->GetAttr("out_type", outDataType_); if (getRet != FLOW_FUNC_SUCCESS) { FLOW_FUNC_RUN_LOG_ERROR("GetAttr dType not exist. "); return getRet; } FLOW_FUNC_RUN_LOG_INFO("Add flow func init success"); return FLOW_FUNC_SUCCESS; }
- Proc(): user-defined computation processing function. The UDF framework calls this method after receiving the data of the input tensor.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
int32_t Proc(const std::vector<std::shared_ptr<FlowMsg>> &inputMsgs) override { FLOW_FUNC_LOG_DEBUG("proc start"); // In the Add function, there are only two inputs. if (inputMsgs.size() != 2) { // Return the error code defined in flow_func_defines.h. FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size()); return FLOW_FUNC_ERR_PARAM_INVALID; } // invalid input auto inputFlowMsg1 = inputFlowMsgs[0]; auto inputFlowMsg2 = inputFlowMsgs[1]; FLOW_FUNC_LOG_INFO("inputFlowMsg1.RetCode=%d, inputFlowMsg2.RetCode=%d", inputFlowMsg1->GetRetCode(), inputFlowMsg2->GetRetCode()); // Check whether the value of RetCode in the input message is 0. If the value is 0, the processing result is normal. If not, an error is reported. if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) { FLOW_FUNC_LOG_ERROR("invalid input"); return -1; } MsgType MsgType1 = inputFlowMsg1->GetMsgType(); MsgType MsgType2 = inputFlowMsg2->GetMsgType(); // Check whether the input message is of the tensor type. if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) { // Obtain the input tensor. auto inputTensor1 = inputFlowMsg1->GetTensor(); auto inputTensor2 = inputFlowMsg2->GetTensor(); auto inputShape1 = inputTensor1->GetShape(); // Allocate the output flow message of the FlowFunc operator based on the shape of the input data and data type of the output data. auto outputMsg = context_->AllocTensorMsg(inputShape1, outDataType_); if (outputMsg == nullptr) { FLOW_FUNC_LOG_ERROR("allow tensor msg failed"); return -1; } // Call the AddTensor API in the third-party library to implement addition. if (AddDepend::Instance().AddTensor(inputTensor1, inputTensor2, outputMsg, outDataType_) != 0) { FLOW_FUNC_LOG_ERROR("add tensor failed"); return -1; } // Output the calculation result. return context_->SetOutput(0, outputMsg); } // If the input message is not of the tensor type, an error is returned. FLOW_FUNC_LOG_ERROR("MsgType is not Tensor."); return -1; }
UDF Implementation File (by Calling NN)
You need to develop user-defined functions in the workspace/xx.cpp file of the project. The following uses call_nn_flow_func.cpp as an example.
Function analysis:
The Add function implements addition.
Specifying the input and output:
The function contains two inputs and one output.
Function implementation:
Inherit the MetaFlowFunc base class of the meta_flow_func.h file and rewrite the Init() and Proc() functions.
1
|
class CallNnFlowFunc: public MetaFlowFunc{} |
- Init(): performs initialization, such as initializing variables and obtaining attributes. In this example, Init() does not process anything and directly returns FLOW_FUNC_SUCCESS.
1 2 3 4 5
int32_t Init() override { FLOW_FUNC_RUN_LOG_INFO("call nn flow func init success"); return FLOW_FUNC_SUCCESS; }
- Proc(): user-defined computation processing function. The UDF framework calls this method after receiving the data of the input tensor.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
int32_t Proc(const std::vector<std::shared_ptr<FlowMsg>> &inputMsgs) override { std::vector<std::shared_ptr<FlowMsg>> outputMsgs; // Call RunFlowModel to execute the NN model called in the user-defined processing function. The NN model implements addition through IR graph construction. The model execution result is stored in outputMsgs. auto ret = context_->RunFlowModel(dependKey_.c_str(), inputMsgs, outputMsgs, 100000); if (ret != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR("Run flow model failed, ret=%d, dependKey=%s", ret, dependKey_.c_str()); return ret; } // Output the result outputMsgs of calling the NN model through SetOutput. for (size_t i = 0; i < outputMsgs.size(); ++i) { ret = context_->SetOutput(i, outputMsgs[i]); if (ret != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR("Set output failed, ret=%d, index=%zu", ret, i); return ret; } } return FLOW_FUNC_SUCCESS; } // dependKey_ is a private data member defined by the CallNnFlowFunc class. private: std::string dependKey_{"invoke_graph"};
UDF Registration
Use the REGISTER_FLOW_FUNC macro to declare the implementation class as a function name, and register it to the UDF framework.
1
|
REGISTER_FLOW_FUNC("call_nn", AddFlowFunc); |
- call_nn: user-defined function name, which needs to be customized as required.
- AddFlowFunc: class name, which must be the same as that in UDF Implementation File (by Using UDF).