Development Process (Calling the MetaFlowFunc Class)

Overview

During UDF development, you can call the MetaFlowFunc class to compile the user-defined single-func processing function. You can also call the MetaMultiFunc class to compile the user-defined multi-func processing function. This section describes how to call the MetaFlowFunc class to develop the UDF.

The implementation of UDF consists of two parts:

  • The scenario of using a single device allows one input with one or multiple outputs, or multiple inputs with one output.
  • The scenario of using two devices allows one input with one output only.
  • The following uses the Add function as an example, to describe how to develop, compile, construct, and verify UDF.

UDF Implementation File (by Using UDF)

You need to develop user-defined functions in the workspace/xx.cpp file of the project. The following uses add_flow_func.cpp as an example.

Function analysis:

The Add function implements addition for the float type and int type.

Specifying the input and output:

The function contains two inputs and one output.

Function implementation:

Inherit the MetaFlowFunc base class of the meta_flow_func.h file, and rewrite the Init() and Proc() functions.

If some resources need to be released, they need to be processed in the destructor.

1
2
3
namespace FlowFunc {
class AddFlowFunc: public MetaFlowFunc{};
}

If you need to use the concurrency function of Task Parallel Runtime (TPRT), see Development Process (Calling the MetaMultiFunc Class) for the implementation of the following Init() and Proc() functions.

  • Init(): performs initialization, such as initializing variables and obtaining attributes. In this example, what needs to be obtained is the out_type attribute set in FunctionPp during DataFlow graph construction. For example:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    int32_t Init() override
        {
            // Use the GetAttr method of the MetaContext class to obtain the out_type attribute set in the operator graph construction. The attribute value is stored in the private variable outDataType_ of the class and is used to convert the source type to the outDataType_ type.
            auto getRet = context_->GetAttr("out_type", outDataType_);
            if (getRet != FLOW_FUNC_SUCCESS) {
                FLOW_FUNC_RUN_LOG_ERROR("GetAttr dType not exist. ");
                return getRet;
            }
            FLOW_FUNC_RUN_LOG_INFO("Add flow func init success");
            return FLOW_FUNC_SUCCESS;
        }
    
  • Proc(): user-defined computation processing function. The UDF framework calls this method after receiving the data of the input tensor.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    int32_t Proc(const std::vector<std::shared_ptr<FlowMsg>> &inputMsgs) override
    {
        FLOW_FUNC_LOG_DEBUG("proc start");
        // In the Add function, there are only two inputs.
        if (inputMsgs.size() != 2) {
            // Return the error code defined in flow_func_defines.h.
            FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size());
            return FLOW_FUNC_ERR_PARAM_INVALID;
        }
    
        // invalid input
        auto inputFlowMsg1 = inputFlowMsgs[0];
        auto inputFlowMsg2 = inputFlowMsgs[1];
        FLOW_FUNC_LOG_INFO("inputFlowMsg1.RetCode=%d, inputFlowMsg2.RetCode=%d", inputFlowMsg1->GetRetCode(), inputFlowMsg2->GetRetCode());
        // Check whether the value of RetCode in the input message is 0. If the value is 0, the processing result is normal. If not, an error is reported.
        if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) {
            FLOW_FUNC_LOG_ERROR("invalid input");
            return -1;
        }
        MsgType MsgType1 = inputFlowMsg1->GetMsgType();
        MsgType MsgType2 = inputFlowMsg2->GetMsgType();
        // Check whether the input message is of the tensor type.
        if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) {
            // Obtain the input tensor.
            auto inputTensor1 = inputFlowMsg1->GetTensor();
            auto inputTensor2 = inputFlowMsg2->GetTensor();
            auto inputShape1 = inputTensor1->GetShape();
            // Allocate the output flow message of the FlowFunc operator based on the shape of the input data and data type of the output data.
            auto outputMsg = context_->AllocTensorMsg(inputShape1, outDataType_);
            if (outputMsg == nullptr) {
                FLOW_FUNC_LOG_ERROR("allow tensor msg failed");
                return -1;
            }
            // Call the AddTensor API in the third-party library to implement addition.
            if (AddDepend::Instance().AddTensor(inputTensor1, inputTensor2, outputMsg, outDataType_) != 0) {
                FLOW_FUNC_LOG_ERROR("add tensor failed");
                return -1;
            }
            // Output the calculation result.
            return context_->SetOutput(0, outputMsg);
        }
        // If the input message is not of the tensor type, an error is returned.
        FLOW_FUNC_LOG_ERROR("MsgType is not Tensor.");
        return -1;
    }
    

UDF Implementation File (by Calling NN)

You need to develop user-defined functions in the workspace/xx.cpp file of the project. The following uses call_nn_flow_func.cpp as an example.

Function analysis:

The Add function implements addition.

Specifying the input and output:

The function contains two inputs and one output.

Function implementation:

Inherit the MetaFlowFunc base class of the meta_flow_func.h file and rewrite the Init() and Proc() functions.

1
class CallNnFlowFunc: public MetaFlowFunc{}
  • Init(): performs initialization, such as initializing variables and obtaining attributes. In this example, Init() does not process anything and directly returns FLOW_FUNC_SUCCESS.
    1
    2
    3
    4
    5
    int32_t Init() override
    {
        FLOW_FUNC_RUN_LOG_INFO("call nn flow func init success");
        return FLOW_FUNC_SUCCESS;
    }
    
  • Proc(): user-defined computation processing function. The UDF framework calls this method after receiving the data of the input tensor.
    For example:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    int32_t Proc(const std::vector<std::shared_ptr<FlowMsg>> &inputMsgs) override
    {
        std::vector<std::shared_ptr<FlowMsg>> outputMsgs;
        // Call RunFlowModel to execute the NN model called in the user-defined processing function. The NN model implements addition through IR graph construction. The model execution result is stored in outputMsgs.
        auto ret = context_->RunFlowModel(dependKey_.c_str(), inputMsgs, outputMsgs, 100000);
        if (ret != FLOW_FUNC_SUCCESS) {
            FLOW_FUNC_LOG_ERROR("Run flow model failed, ret=%d, dependKey=%s", ret, dependKey_.c_str());
            return ret;
        }
        // Output the result outputMsgs of calling the NN model through SetOutput.
        for (size_t i = 0; i < outputMsgs.size(); ++i) {
            ret = context_->SetOutput(i, outputMsgs[i]);
            if (ret != FLOW_FUNC_SUCCESS) {
                FLOW_FUNC_LOG_ERROR("Set output failed, ret=%d, index=%zu", ret, i);
                return ret;
            }
        }
        return FLOW_FUNC_SUCCESS;
    }
    // dependKey_ is a private data member defined by the CallNnFlowFunc class.
    private:
        std::string dependKey_{"invoke_graph"};
    

UDF Registration

Use the REGISTER_FLOW_FUNC macro to declare the implementation class as a function name, and register it to the UDF framework.

1
REGISTER_FLOW_FUNC("call_nn", AddFlowFunc);