Development Process (Calling the MetaMultiFunc Class)

Overview

During UDF development, you can call the MetaFlowFunc class to compile the user-defined single-func processing function. You can also call the MetaMultiFunc class to compile the user-defined multi-func processing function. This section describes how to call the MetaMultiFunc class to develop the UDF.

The implementation of UDF consists of two parts:

  • The scenario of using a single device allows one input with one or multiple outputs, or multiple inputs with one output.
  • The scenario of using two devices allows one input with one output only.
  • The following uses the Add function as an example, to describe how to develop, compile, construct, and verify UDF.

UDF Implementation File (by Using UDF)

You need to develop user-defined functions in the workspace/xx.cpp file of the project. The following uses add_flow_func.cpp as an example.

Function analysis:

The Add function implements addition for the float type and int type. The first processing function adds a and b, and the second function adds 2a and b.

Specifying the input and output:

Each proc function contains two inputs and one output. The input queues of the two proc processing functions are not shared, but the output queues are shared.

Remarks:

In multi-function scenarios, multiple processing functions are concurrently scheduled. You need to consider multi-thread concurrency. Locking may affect deterministic scheduling.

Function implementation:

Inherit the MetaMultiFunc base class of the meta_multi_func.h file and rewrite the Init() function and user-defined multi-func processing function.

If some resources need to be released, they need to be processed in the destructor.

namespace FlowFunc {
class AddFlowFunc: public MetaMultiFunc{};
}
  • Init(): performs initialization, such as initializing variables and obtaining attributes. If initialization is not required in FlowFunc, you do not need to add this function. In this example, you need to obtain the out_type attribute set in FunctionPp during DataFlow graph construction. For example:
    int32_t Init(const std::shared_ptr<MetaParams> &params) override
        {
            // Use the GetAttr method of the MetaParams class to obtain the out_type attribute set in the operator graph construction. The attribute value is stored in the private variable outDataType_ of the class and is used to convert the source type to the outDataType_ type.
            auto getRet = params->GetAttr("out_type", outDataType_);
            if (getRet != FLOW_FUNC_SUCCESS) {
                FLOW_FUNC_RUN_LOG_ERROR("GetAttr dType not exist. ");
                return getRet;
            }
            // Initialize the shared variable of the multi-func processing function.
            setOutputCount_ = 0;
            // To enable the concurrency function of the TPRT, add the following information in bold:
            //(void)UdfTprt::Init(params);
            return 0;
        }
  • template: template function of the Add implementation method.
        // Template function of the Add1 implementation method
        template<typename srcT, typename dstT>
        void Add1(srcT *src1, srcT *src2, dstT *dst, size_t count)
        {
            for (size_t i = 0; i < count; ++i) {
                dst[i] = src1[i] + src2[i];
            }
            /* If you want to enable the TPRT computation concurrency of the preceding three lines of code, replace the preceding three lines of code in bold with the following five lines of code in bold. (In the following example, ParallelFor is used. You can also use Submit and Wait.)
            std::function<void(const int64_t index)> func = [src1, src2, dst] 
               (const int64_t index) {
                dst[index] = src1[index] + src2[index];
            };
            UdfTprt::ParallelFor(0, count, func);
            */
        }
        // Template function of the Add2 implementation method
        template<typename srcT, typename dstT>
        void Add2(srcT *src1, srcT *src2, dstT *dst, size_t count)
        {
            for (size_t i = 0; i < count; ++i) {
                dst[i] = 2 * src1[i] + src2[i];
            }
            /* If you want to enable the TPRT computation concurrency of the preceding three lines of code, replace the preceding three lines of code in bold with the following seven lines of code in bold. (In the following example, Submit and Wait are used. You can also use ParallelFor.)
            UdfTprtTaskAttr attr;
            for (size_t i = 0; i < count; ++i) {
                UdfTprt::Submit([src1, src2, dst, i]() {
                    dst[i] = 2 * src1[i] + src2[i];
                }, {}, {}, attr);
            }
            UdfTprt::Wait();
            */
        }
    
        template<typename T>
        void Add1(T *src1, T *src2, void *dst, size_t count)
        {
            if (outDataType_ == TensorDataType::DT_FLOAT) {
                Add1(src1, src2, static_cast<float *>(dst), count);
            }
            if (outDataType_ == TensorDataType::DT_UINT32) {
                Add1(src1, src2, static_cast<uint32_t *>(dst), count);
            }
            else if (outDataType_ == TensorDataType::DT_INT64) {
                Add1(src1, src2, static_cast<int64_t *>(dst), count);
            }
        }
        template<typename T>
        void Add2(T *src1, T *src2, void *dst, size_t count)
        {
            if (outDataType_ == TensorDataType::DT_FLOAT) {
                Add2(src1, src2, static_cast<float *>(dst), count);
            }
            if (outDataType_ == TensorDataType::DT_UINT32) {
                Add2(src1, src2, static_cast<uint32_t *>(dst), count);
            }
            else if (outDataType_ == TensorDataType::DT_INT64) {
                Add2(src1, src2, static_cast<int64_t *>(dst), count);
            }
        }
  • Multi-func processing function: user-defined computation processing function. The UDF framework calls this method after receiving the data of the input tensor.
    A code example is as follows:
    // User-defined multi-func processing function name: Proc1. The function name is user-defined. The input parameter and return value must be the same as those in the prototype.
    int32_t Proc1(const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs)
        {
            // Check whether the add operation has two inputs.
            if (inputFlowMsgs.size() != 2) {
                FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size());
                return -1;
            }
            auto inputFlowMsg1 = inputFlowMsgs[0];
            auto inputFlowMsg2 = inputFlowMsgs[1];
            // Invalid input verification: Check whether the error code of the input message is not 0.
            if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) {
                FLOW_FUNC_LOG_ERROR("invalid input");
                return -1;
            }
            // Obtain the message type of the input data.
            MsgType MsgType1 = inputFlowMsg1->GetMsgType();
            MsgType MsgType2 = inputFlowMsg2->GetMsgType();
            // Check whether the input data is of the tensor type.
            if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) {
                // Obtain the tensor from the message.
                auto inputTensor1 = inputFlowMsg1->GetTensor();
                auto inputTensor2 = inputFlowMsg2->GetTensor();
                // Obtain the data type from the tensor.
                auto inputDataType1 = inputTensor1->GetDataType();
                auto inputDataType2 = inputTensor2->GetDataType();
                // Check whether the data types of the two inputs are the same.
                if (inputDataType1 != inputDataType2) {
                    FLOW_FUNC_LOG_ERROR("input type not be same");
                    return -1;
                }
                // Obtain the shape information from the tensor.
                auto &inputShape1 = inputTensor1->GetShape();
                auto &inputShape2 = inputTensor2->GetShape();
    
                // Check whether the shapes of the two input tensors are the same.
                if (inputShape1 != inputShape2) {
                    FLOW_FUNC_LOG_ERROR("input shape not be same");
                    return -1;
                }
                // Allocate the output message information.
                auto outputMsg = runContext->AllocTensorMsg(inputShape1, outDataType_);
                if (outputMsg == nullptr) {
                    FLOW_FUNC_LOG_ERROR("all tensor fail");
                    return -1;
                }
                auto outputTensor = outputMsg->GetTensor();
                // Obtain the tensor data size from the tensor.
                auto dataSize1 = inputTensor1->GetDataSize();
                auto dataSize2 = inputTensor2->GetDataSize();
    
                if (dataSize1 == 0) {
                    return runContext->SetOutput(0, outputMsg);
                }
                // Obtain the tensor data pointer from the tensor.
                auto inputData1 = inputTensor1->GetData();
                auto inputData2 = inputTensor2->GetData();
                auto outputData = outputTensor->GetData();
                // Perform addition operations based on different data types.
                switch (inputDataType1) {
                    case TensorDataType::DT_FLOAT:
                        Add1(static_cast<float *>(inputData1), static_cast<float *>(inputData2), outputData, dataSize1 / sizeof(float));
                        break;
                    case TensorDataType::DT_INT16:
                        Add1(static_cast<int16_t *>(inputData1), static_cast<int16_t *>(inputData2), outputData, dataSize1 / sizeof(int16_t));
                        break;
                    case TensorDataType::DT_UINT16:
                        Add1(static_cast<uint16_t *>(inputData1), static_cast<uint16_t *>(inputData2), outputData, dataSize1 / sizeof(uint16_t));
                        break;
                    case TensorDataType::DT_UINT32:
                        Add1(static_cast<uint32_t *>(inputData1), static_cast<uint32_t *>(inputData2), outputData, dataSize1 / sizeof(uint32_t));
                        break;
                    case TensorDataType::DT_INT8:
                        Add1(static_cast<int8_t *>(inputData1), static_cast<int8_t *>(inputData2), outputData, dataSize1 / sizeof(int8_t));
                        break;
                    case TensorDataType::DT_INT64:
                        Add1(static_cast<int64_t *>(inputData1), static_cast<int64_t *>(inputData2), outputData, dataSize1 / sizeof(int64_t));
                        break;
                    default:
                        // Unsupported data type. Set the output error code.
                        outputMsg->SetRetCode(100);
                        break;
                }
                // Multiple processing functions are scheduled concurrently. You need to perform locking on shared data.
                std::unique_lock<std::mutex> lock(countMutex_);
                setOutputCount_++;
                return runContext->SetOutput(0, outputMsg);
            }
            FLOW_FUNC_LOG_ERROR("MsgType is not Tensor.");
            return -1;
        }
        // User-defined multi-func processing function name: Proc2. The function name is user-defined. The input parameter and return value must be the same as those in the prototype.
        int32_t Proc2(const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs)
        {
            // Check whether the add operation has two inputs.
            if (inputFlowMsgs.size() != 2) {
                FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size());
                return -1;
            }
            auto inputFlowMsg1 = inputFlowMsgs[0];
            auto inputFlowMsg2 = inputFlowMsgs[1];
            // Invalid input verification: Check whether the error code of the input message is not 0.
            if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) {
                FLOW_FUNC_LOG_ERROR("invalid input");
                return -1;
            }
            // Obtain the message type of the input data.
            MsgType MsgType1 = inputFlowMsg1->GetMsgType();
            MsgType MsgType2 = inputFlowMsg2->GetMsgType();
            // Check whether the input data is of the tensor type.
            if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) {
                // Obtain the tensor from the input message.
                auto inputTensor1 = inputFlowMsg1->GetTensor();
                auto inputTensor2 = inputFlowMsg2->GetTensor();
                // Obtain the data type from the tensor.
                auto inputDataType1 = inputTensor1->GetDataType();
                auto inputDataType2 = inputTensor2->GetDataType();
    
                // Check whether the data types of the two inputs are the same.
                if (inputDataType1 != inputDataType2) {
                    FLOW_FUNC_LOG_ERROR("allow Tensor msg failed");
                    return -1;
                }
                // Obtain the shape information from the tensor.
                auto &inputShape1 = inputTensor1->GetShape();
                auto &inputShape2 = inputTensor2->GetShape();
    
                // Check whether the shapes of the two input tensors are the same.
                if (inputShape1 != inputShape2) {
                    FLOW_FUNC_LOG_ERROR("input shape not be same");
                    return -1;
                }
                // Allocate the output tensor information.
                auto outputMsg = runContext->AllocTensorMsg(inputShape1, outDataType_);
                if (outputMsg == nullptr) {
                    FLOW_FUNC_LOG_ERROR("all tensor fail");
                    return -1;
                }
                auto outputTensor = outputMsg->GetTensor();
                // Obtain the tensor data size from the tensor.
                auto dataSize1 = inputTensor1->GetDataSize();
                auto dataSize2 = inputTensor2->GetDataSize();
                if (dataSize1 == 0) {
                    return runContext->SetOutput(0, outputMsg);
                }
                // Obtain the tensor data pointer from the tensor.
                auto inputData1 = inputTensor1->GetData();
                auto inputData2 = inputTensor2->GetData();
                auto outputData = outputTensor->GetData();
                // Perform addition operations based on different data types.
                switch (inputDataType1) {
                    case TensorDataType::DT_FLOAT:
                        Add2(static_cast<float *>(inputData1), static_cast<float *>(inputData2), outputData, dataSize1 / sizeof(float));
                        break;
                    case TensorDataType::DT_INT16:
                        Add2(static_cast<int16_t *>(inputData1), static_cast<int16_t *>(inputData2), outputData, dataSize1 / sizeof(int16_t));
                        break;
                    case TensorDataType::DT_UINT16:
                        Add2(static_cast<uint16_t *>(inputData1), static_cast<uint16_t *>(inputData2), outputData, dataSize1 / sizeof(uint16_t));
                        break;
                    case TensorDataType::DT_UINT32:
                        Add2(static_cast<uint32_t *>(inputData1), static_cast<uint32_t *>(inputData2), outputData, dataSize1 / sizeof(uint32_t));
                        break;
                    case TensorDataType::DT_INT8:
                        Add2(static_cast<int8_t *>(inputData1), static_cast<int8_t *>(inputData2), outputData, dataSize1 / sizeof(int8_t));
                        break;
                    case TensorDataType::DT_INT64:
                        Add2(static_cast<int64_t *>(inputData1), static_cast<int64_t *>(inputData2), outputData, dataSize1 / sizeof(int64_t));
                        break;
                    default:
                        // not support
                        outputMsg->SetRetCode(100);
                        break;
                }
                // Multiple processing functions are scheduled concurrently. You need to perform locking on shared data.
                std::unique_lock<std::mutex> lock(countMutex_);
                setOutputCount_++;
                return runContext->SetOutput(0, outputMsg);
            }
            FLOW_FUNC_LOG_ERROR("MsgType is not Tensor.");
            return -1;
        }
    
    private:
        TensorDataType outDataType_;
        std::mutex countMutex_;
        uint32_t setOutputCount_;

UDF Implementation File (by Calling NN)

You need to develop user-defined functions in the workspace/xx.cpp file of the project. The following uses call_nn_flow_func.cpp as an example.

Function analysis:

This function implements addition.

Specifying the input and output:

The function contains two inputs and one output.

Function implementation:

Inherit the MetaFlowFunc base class of the meta_flow_func.h file, and rewrite the Init() function and user-defined processing function.

class CallNnFlowFunc: public MetaMultiFunc{}
  • Init(): performs initialization, such as initializing variables and obtaining attributes. In this example, Init() does not process anything, so it is not required.
  • Add(): user-defined computation processing function. The UDF framework uses this method after receiving the input tensor.
    For example:
    // User-defined multi-func processing function name: Add. The function name is user-defined. The input parameter and return value must be the same as those in the prototype.
    int32_t Add(const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs)
    {
        std::vector<std::shared_ptr<FlowMsg>> outputMsgs;
        // Call RunFlowModel of runContext to execute the NN model called in the user-defined processing function. The NN model implements addition through IR graph construction. The model execution result is stored in outputMsgs.
        auto ret = runContext->RunFlowModel("invoke_graph", inputMsgs, outputMsgs, 100000);
        if (ret != FLOW_FUNC_SUCCESS) {
            return ret;
        }
        // Output the result outputMsgs of calling the NN model through SetOutput.
        for (size_t i = 0; i < outputMsgs.size(); ++i) {
            ret = runContext->SetOutput(i, outputMsgs[i]);
            if (ret != FLOW_FUNC_SUCCESS) {
                return ret;
            }
        }
        return FLOW_FUNC_SUCCESS;
    }

UDF Registration

Use the FLOW_FUNC_REGISTRAR macro to declare the implementation class as a function name, and register it to the UDF framework.

FLOW_FUNC_REGISTRAR(AddFlowFunc)
    .RegProcFunc("Proc1", &AddFlowFunc::Proc1)
    .RegProcFunc("Proc2", &AddFlowFunc::Proc2);
FLOW_FUNC_REGISTRAR(CallNnFlowFunc)
    .RegProcFunc("CallNn_Add", &CallNnFlowFunc::Add);
  • Proc1: user-defined multi-func processing function name, which must be the same as the processing function name defined in AddFlowFunc.
  • Proc2: user-defined multi-func processing function name, which must be the same as the processing function name defined in AddFlowFunc.
  • Add: user-defined function name, which must be the same as the processing function name defined in CallNnFlowFunc.
  • AddFlowFunc: class name, which must be the same as that in UDF Implementation File (by Using UDF).
  • CallNnFlowFunc: class name, which must be the same as that in UDF Implementation File (by Calling NN).