昇腾社区首页
中文
注册

TensorFlow框架

本章节介绍TensorFlow框架算子适配的流程,用于将TensorFlow框架的算子映射成CANN算子(开发者基于CANN框架自定义开发的算子),从而完成从TensorFlow框架调用到CANN算子的过程。同时给出TensorFlow框架侧算子调用的示例,便于开发者了解完整流程。

下图展示了完整的开发流程图,关键步骤如下:首先开发者需要参考工程化算子开发算子入图(GE图)开发完成算子实现和入图开发;然后进行TensorFlow框架适配插件开发,这是本节介绍的重点,用于将TensorFlow框架的算子映射成CANN算子;TensorFlow框架的算子包括TensorFlow自定义算子和原生算子,如果需要从TensorFlow自定义算子映射到CANN算子,还需要完成TensorFlow自定义算子的开发;最后进行TensorFlow框架侧算子调用代码编写。TensorFlow自定义算子和TensorFlow算子调用的相关内容您可以在TensorFlow的官方文档中找到更详细的介绍,本章节仅给出示例供参考。

具体步骤如下:

  1. 环境准备。
    1. CANN软件安装请参考环境准备
    2. 创建算子工程。使用msOpGen工具创建算子开发工程。TensorFlow框架算子适配场景下,需要通过framework参数指定具体的框架为tf或者tensorflow,工具会自动生成框架适配代码。以自定义CANN算子AddCustom为例,使用msOpGen工具创建算子开发工程的具体命令如下:
      ${INSTALL_DIR}/python/site-packages/bin/msopgen gen -i $HOME/sample/add_custom.json -f tf -c ai_core-<soc_version> -lan cpp -out $HOME/sample/AddCustom
  2. 算子实现。
    • 算子原型定义。通过原型定义来描述算子输入输出、属性等信息以及算子在AI处理器上相关实现信息,并关联tiling实现等函数。
    • Kernel侧算子实现和host侧tiling实现请参考算子实现;工程化算子开发,支持开发者调用Tiling API基于CANN提供的编程框架进行tiling开发,kernel侧也提供对应的接口方便开发者获取tiling参数,具体内容请参考Kernel侧算子实现Host侧tiling实现,由此而带来的额外约束也在上述章节说明。
  3. 算子入图(GE图)开发。算子入图场景下,需要提供shape推导等算子入图适配函数的实现。
  4. TensorFlow框架适配插件开发。详细说明见适配插件开发
  5. 编译部署。通过工程编译脚本完成算子的编译部署。
  6. TensorFlow框架算子调用。详细说明见TensorFlow原生算子映射到CANN算子TensorFlow自定义算子开发并映射到CANN算子。完整样例请参考LINK

适配插件开发

完成算子工程创建后,会在算子工程目录下生成framework/tf_plugin目录,用于存放TensorFlow框架适配插件实现文件。以自定义CANN算子AddCustom为例,算子工程目录如下:

AddCustom
├── build.sh             // 编译入口脚本
├── cmake 
├── CMakeLists.txt       // 算子工程的CMakeLists.txt
├── CMakePresets.json    // 编译配置项
├── framework            // 框架适配插件实现文件目录
│   ├── tf_plugin     //  TensorFlow框架适配插件实现文件目录
│   │   ├── CMakeLists.txt    
│   │   ├── tensorflow_add_custom_plugin.cc  // TensorFlow框架适配插件实现文件    
│   ├── CMakeLists.txt
├── op_host                      // host侧实现文件
├── op_kernel                    // kernel侧实现文件
└── scripts                      // 自定义算子工程打包相关脚本所在目录
当TensorFlow算子与CANN算子原型定义一致时,TensorFlow框架适配插件实现代码如下:
#include "register/register.h"
namespace domi {
REGISTER_CUSTOM_OP("AddCustom")
    .FrameworkType(TENSORFLOW) 
    .OriginOpType("AddCustom")   
    .ParseParamsByOperatorFn(AutoMappingByOpFn);
}

当TensorFlow算子与CANN算子原型定义不一致时,TensorFlow框架适配插件实现代码如下:

#include "register/register.h"
REGISTER_CUSTOM_OP("FlashAttentionScore")
    .FrameworkType(TENSORFLOW)
    .OriginOpType({"FlashAttentionScore"})
    .ParseParamsByOperatorFn(FlashAttentionScoreMapping)  
    .ParseOpToGraphFn(AddOptionalPlaceholderForFA);
  • 包含插件实现函数相关的头文件。

    register.h存储在CANN软件安装后文件存储路径的“include/register/”目录下,包含该头文件,可使用算子注册相关类,调用算子注册相关的接口。

  • REGISTER_CUSTOM_OP:注册自定义算子,传入算子的OpType,需要与算子原型注册中的OpType保持一致。
    • FrameworkType:TENSORFLOW代表原始框架为TensorFlow
    • OriginOpType:算子在原始框架中的类型。对于TensorFlow自定义算子,还需要完成TensorFlow自定义算子的开发,这里的OriginOpType与REGISTER_OP注册算子名相同,对于TensorFlow原生算子, 即为原生算子名。
    • ParseParamsByOperatorFn:用来注册解析算子参数实现映射关系的回调函数,需要用户自定义实现回调函数ParseParamByOpFunc。原始TensorFlow算子中参数与CANN算子中参数一一对应时,可直接使用自动映射回调函数AutoMappingByOpFn自动实现映射。
    • ParseOpToGraphFn:当TensorFlow算子与CANN算子原型定义不一致(比如CANN算子原型定义原型中有可选输入,但TensorFlow原型定义中不支持可选输入,没有可选输入)的情况时,用来注册调整算子原型映射关系的回调函数。

TensorFlow原生算子映射到CANN算子

以自定义算子AddCustom为例,将该算子映射到TensorFlow内置算子Add上,需要先修改AddCustom自定义算子目录framework/tf_plugin下插件代码,完成算子名映射:

1
2
3
4
5
6
7
#include "register/register.h"
namespace domi {
REGISTER_CUSTOM_OP("AddCustom")   // 当前Ascend C自定义算子名
    .FrameworkType(TENSORFLOW)    // 第三方框架类型TENSORFLOW
    .OriginOpType("Add")          // 映射到TensorFlow原生算子Add
    .ParseParamsByOperatorFn(AutoMappingByOpFn);
}

完成算子工程的编译部署后,构造单算子的TensorFlow 1.15版本测试用例进行验证。

  1. 编写测试用例“tf_add.py”。
  2. 导入python库。
    1
    2
    3
    4
    import logging            # Python标准库日志模块
    import tensorflow as tf   # 导入TensorFlow开源库
    from npu_bridge.estimator import npu_ops   # 导入TensorFlow开源库中的npu_ops模块
    import numpy as np    # 导入Python的数学基础库
    
  3. 通过config()定义昇腾AI处理器和CPU上的运行参数。

    “execute_type”“ai_core”时,代表在昇腾AI处理器上运行单算子网络,最终会调用到Ascend C算子。

    “execute_type”“cpu”时,代表在Host侧的CPU运行单算子网络,调用的是TensorFlow算子。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    def config(execute_type):
        if execute_type == 'ai_core':
            session_config = tf.ConfigProto(
                allow_soft_placement=True,
                log_device_placement=False,)
            custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add()
            custom_op.name = "NpuOptimizer"
            custom_op.parameter_map["enable_data_pre_proc"].b = True   # 开启数据预处理下沉到Device侧执行
            custom_op.parameter_map["mix_compile_mode"].b = True    
            custom_op.parameter_map["use_off_line"].b = True     # True表示在昇腾AI处理器上执行训练
            
        elif execute_type == 'cpu':
            session_config = tf.ConfigProto(
                allow_soft_placement=True,
                log_device_placement=False)
    
        return session_config
    
  4. 单算子网络测试用例主函数。
    • 算子输入请根据算子实际输入个数及shape进行构造。
    • 算子输出的计算,请根据算子逻辑调用TensorFlow相关接口进行实现。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    #设置np.allclose比较函数的公差参数。
    #np.allclose比较函数的相对公差参数
    atol = 0.001
    #np.allclose比较函数的绝对公差参数
    rtol = 0.001
    
    def main(unused_argv):
        shape_params = (8, 2048)
        dtype_params = np.float16
    
        # 构造Add算子的两个输入数据,shape为shape_params,范围在[-2,2]之间的随机数
        x_data = np.random.uniform(-2, 2, size=shape_params).astype(dtype_params)
        y_data = np.random.uniform(-2, 2, size=shape_params).astype(dtype_params)
        # 分别对Add算子的两个输入数据进行占位
        x = tf.compat.v1.placeholder(dtype_params, shape=shape_params)
        y = tf.compat.v1.placeholder(dtype_params, shape=shape_params)
        # 计算算子输出
        out = tf.math.add(x, y)
        # 在Host侧CPU上运行单算子,得到期望运行结果
        with tf.compat.v1.Session(config=config('cpu')) as session:
            result_cpu = session.run(out, feed_dict={x: x_data, y: y_data})
        # 在昇腾AI处理器上运行单算子,得到实际运行结果
        with tf.compat.v1.Session(config=config('ai_core')) as session:
            result_ai_core = session.run(out, feed_dict={x: x_data, y: y_data})
    
        np.array(result_ai_core).astype(dtype_params)
        np.array(result_cpu).astype(dtype_params)
        print('====================================')
       # 通过np.allclose比较昇腾AI处理器上运行的实际结果和cpu上运行的期望结果,其中atol和rtol为np.allclose比较函数的相对公差参数和绝对公差参数
        cmp_result = np.allclose(result_ai_core, result_cpu, atol, rtol)
        print(cmp_result)
        print('====================================')
    
  5. 运行单算子网络。
    1
    2
    if __name__ == "__main__":
        tf.app.run()
    

TensorFlow自定义算子开发并映射到CANN算子

  1. 适配插件代码开发。以自定义算子AddCustom为例,将该算子映射到TensorFlow自定义算子AddCustom上,需要先修改CANN AddCustom自定义算子工程目录framework/tf_plugin下插件代码,完成算子名映射:
    REGISTER_CUSTOM_OP("AddCustom")
      .FrameworkType(TENSORFLOW)      
      .OriginOpType("AddCustom") 
      .ParseParamsByOperatorFn(AutoMappingByOpFn);
  2. TensorFlow自定义算子的开发。本节仅给出示例说明,详细内容请参考TensorFlow官方文档。

    创建TensorFlow原型注册文件custom_assign_add_custom.cc,内容如下:

    #include "tensorflow/core/framework/op.h"
    #include "tensorflow/core/framework/shape_inference.h"
    #include "tensorflow/core/framework/op_kernel.h"
    #include "tensorflow/core/framework/common_shape_fns.h"
    using namespace tensorflow;
    
    // 通过TensorFlow提供的REGISTER_OP接口完成算子原型的注册
    REGISTER_OP("AddCustom")        // TensorFlow 注册算子名
        .Input("x: T")              // 算子原型,输入参数x,类型为T
        .Input("y: T")              // 算子原型,输入参数y,类型为T
        .Output("z: T")             // 算子原型,输入参数z,类型为T
        .Attr("T: {half}")          // T类型支持范围
        .SetShapeFn(shape_inference::BroadcastBinaryOpShapeFn);  // 算子shape信息推导,BroadcastBinaryOpShapeFn为TensorFlow提供的内置函数,输出shape信息由输入shape传播推导,即输入和输出shape保持一致
    
    // 实现一个CPU版本的kernel函数,因为Tensorflow的计算图在构建时会检查所有的算子是否有任意设备上的kernel函数(NPU Kernel无法被感知),如果没有将会报错。这里实现一个固定返回错误的CPU kernel函数:
    class AddCustomOp : public OpKernel {
     public:
      explicit AddCustomOp(OpKernelConstruction* context) : OpKernel(context) {}
    
      void Compute(OpKernelContext* context) override {
        OP_REQUIRES_OK(context, errors::Unimplemented("AddCustomOp is not supported on CPU")); 
      }
    };
    
    REGISTER_KERNEL_BUILDER(Name("AddCustom").Device(DEVICE_CPU), AddCustomOp);          // 注册AddCustom算子的CPU实现内核,该函数当前仅打印日志提示CPU不支持

    使用如下命令对上述代码进行编译,产物为libcustom_ops.so,后续的算子调用脚本中可通过load_op_library接口加载该so为python模块,从而调用自定义算子。

    TF_CFLAGS=( $(python3 -c 'import tensorflow as tf; print(" ".join(tf.sysconfig.get_compile_flags()))') )     // 获取TensorFlow编译选项
    TF_LFLAGS=( $(python3 -c 'import tensorflow as tf; print(" ".join(tf.sysconfig.get_link_flags()))') )        // 获取TensorFlow链接选项
    SOURCE_FILES=custom_assign_add_custom.cc                                                                     // 包含TensorFlow算子注册和CPU内核实现的cc文件
    g++ -std=c++14 -shared $SOURCE_FILES -o ${Path}/libcustom_ops.so -fPIC ${TF_CFLAGS[@]} ${TF_LFLAGS[@]} -O2   // 编译命令,产物为libcustom_ops.so,TensorFlow即可通过load_op_library加载该so为python模块,调用自定义算子
  3. 测试脚本中加载上一步骤编译好的动态库,实现自定义算子的调用。
    • TensorFlow 1.15.0调用代码示例
      import os
      import tensorflow as tf
      import numpy as np
      from npu_bridge.npu_init import *
      tf.enable_resource_variables()
      #np.allclose比较函数的相对公差参数
      atol = 0.001
      #np.allclose比较函数的绝对公差参数
      rtol = 0.001
      def main(unused_argv):
          custom_op_lib = tf.load_op_library('./outputs/libcustom_ops.so')     # 加载so为python模块
          shape_params = (8, 2048)
          dtype_params = np.float16
          x_data = np.random.uniform(-2, 2, size=shape_params).astype(dtype_params)
          y_data = np.random.uniform(-2, 2, size=shape_params).astype(dtype_params)
          x = tf.compat.v1.placeholder(dtype_params, shape=shape_params)
          y = tf.compat.v1.placeholder(dtype_params, shape=shape_params)
          tf_z = tf.math.add(x, y)                                           # 调用TensorFlow原生算子
          ac_z = custom_op_lib.add_custom(x, y)                              # 调用AscendC AddCustom自定义算子
          config = tf.ConfigProto()
          custom_op = config.graph_options.rewrite_options.custom_optimizers.add()
          custom_op.name = "NpuOptimizer"   # 配置在昇腾AI处理器上运行单算子
          config.graph_options.rewrite_options.remapping = RewriterConfig.OFF
          config.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF
          
          with tf.Session(config=config) as sess:
              sess.run(tf.global_variables_initializer())
              tf_golden = sess.run(tf_z, feed_dict={x: x_data, y: y_data})
          with tf.Session(config=config) as sess:
              sess.run(tf.global_variables_initializer())
              ascend_out = sess.run(ac_z, feed_dict={x: x_data, y: y_data})
          np.array(tf_golden).astype(dtype_params)
          np.array(ascend_out).astype(dtype_params)
          print('====================================')
          # 通过np.allclose比较昇腾AI处理器上运行的实际结果和使用TensorFlow原生算子运行的期望结果,其中atol和rtol为np.allclose比较函数的相对公差参数和绝对公差参数。
          cmp_result = np.allclose(tf_golden, ascend_out, atol, rtol)
          print(cmp_result)
          print('====================================')
      if __name__ == "__main__":
          tf.app.run()
    • TensorFlow 2.6.5调用代码
      import os
      import tensorflow as tf
      import numpy as np
      import npu_device
      from npu_device.compat.v1.npu_init import *
      npu_device.compat.enable_v1()
      tf.compat.v1.enable_resource_variables()
      #np.allclose比较函数的相对公差参数
      atol = 0.001
      #np.allclose比较函数的绝对公差参数
      rtol = 0.001
      def main(unused_argv):
          custom_op_lib = tf.load_op_library('./outputs/libcustom_ops.so')     # 加载so为python模块
          
          shape_params = (8, 2048)
          dtype_params = np.float16
          x_data = np.random.uniform(-2, 2, size=shape_params).astype(dtype_params)
          y_data = np.random.uniform(-2, 2, size=shape_params).astype(dtype_params)
          x = tf.compat.v1.placeholder(dtype_params, shape=shape_params)
          y = tf.compat.v1.placeholder(dtype_params, shape=shape_params)
          tf_z = tf.math.add(x, y)                                           # 调用TensorFlow原生算子
          ac_z = custom_op_lib.add_custom(x, y)                              # 调用AscendC AddCustom自定义算子
          
          config = tf.compat.v1.ConfigProto()
          custom_op = config.graph_options.rewrite_options.custom_optimizers.add()
          custom_op.name = "NpuOptimizer"
          config.graph_options.rewrite_options.remapping = RewriterConfig.OFF
          config.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF
          
          with tf.compat.v1.Session(config=config) as sess:
              sess.run(tf.global_variables_initializer())
              tf_golden = sess.run(tf_z, feed_dict={x: x_data, y: y_data})
          with tf.compat.v1.Session(config=config) as sess:
              sess.run(tf.global_variables_initializer())
              ascend_out = sess.run(ac_z, feed_dict={x: x_data, y: y_data})
          np.array(tf_golden).astype(dtype_params)
          np.array(ascend_out).astype(dtype_params)
          print('====================================')
          # 通过np.allclose比较昇腾AI处理器上运行的实际结果和使用TensorFlow原生算子运行的期望结果,其中atol和rtol为np.allclose比较函数的相对公差参数和绝对公差参数。
          cmp_result = np.allclose(tf_golden, ascend_out, atol, rtol)
          print(cmp_result)
          print('====================================')
      if __name__ == "__main__":
          tf.app.run()

可选输入算子映射关系开发

TensorFlow的原型定义中不支持可选输入,对于包含可选输入的算子,其从TensorFlow到CANN的映射关系,不满足简单的一对一映射,需要在插件适配代码中,将输入转换为可选输入,调整原型的映射关系。下文以CANN算子库中的FlashAttentionScore算子为例,介绍针对此类算子的框架适配插件如何开发。

  1. 适配插件开发
    和上文中介绍的简单的一对一映射不同,进行插件适配开发时,需要调用ParseOpToGraphFn注册回调函数,回调函数中用于调整算子原型映射关系。此时:
    • 通过ParseParamsByOperatorFn注册回调函数,回调函数中将TensorFlow原生算子映射到一个IR和TensorFlow一致的中间算子(调用AutoMappingByOpFn完成属性映射)。
    • 通过ParseOpToGraphFn注册回调函数,调整算子原型映射关系,将中间算子最终映射到CANN算子库中的算子,这里映射到Graph图的概念是指一个算子构成的单算子图。
    需要注意:在ParseParamsByOperatorFn的回调函数中,需要将TensorFlow算子名称设置到中间算子的original_type属性中,用于后续ParseOpToGraphFn回调函数的触发。示例代码如下:
    #include <string>
    #include <vector>
    #include "register/register.h"
    #include "graph/operator.h"
    #include "graph/graph.h"
    #include "graph/operator_factory.h"
    
    namespace domi {
    using namespace ge;
    
    static Status AddOptionalPlaceholderForFA(const ge::Operator &tf_op, ge::Graph &graph) {
      // 1. 创建一个FlashAttentionScore算子npu_fa_op
      ge::AscendString op_name;
      tf_op.GetName(op_name);
      auto npu_fa_op = OperatorFactory::CreateOperator(op_name.GetString(), "FlashAttentionScore");
      // 2. 将TensorFlow算子属性映射到npu_fa_op算子上
      float scale_value = 1.0;
      (void)tf_op.GetAttr("scale_value", scale_value);
      (void)npu_fa_op.SetAttr("scale_value", scale_value);
    
      float keep_prob = 1.0;
      (void)tf_op.GetAttr("keep_prob", keep_prob);
      (void)npu_fa_op.SetAttr("keep_prob", keep_prob);
    
      int32_t pre_tockens = 2147483647;
      (void)tf_op.GetAttr("pre_tockens", pre_tockens);
      (void)npu_fa_op.SetAttr("pre_tockens", pre_tockens);
    
      int32_t next_tockens = 2147483647;
      (void)tf_op.GetAttr("next_tockens", next_tockens);
      (void)npu_fa_op.SetAttr("next_tockens", next_tockens);
    
      int32_t head_num = 0;
      (void)tf_op.GetAttr("head_num", head_num);
      (void)npu_fa_op.SetAttr("head_num", head_num);
    
      std::string input_layout;
      (void)tf_op.GetAttr("input_layout", input_layout);
      (void)npu_fa_op.SetAttr("input_layout", input_layout);
    
      int32_t inner_precise = 0;
      (void)tf_op.GetAttr("inner_precise", inner_precise);
      (void)npu_fa_op.SetAttr("inner_precise", inner_precise);
    
      int32_t sparse_mode = 0;
      (void)tf_op.GetAttr("sparse_mode", sparse_mode);
      (void)npu_fa_op.SetAttr("sparse_mode", sparse_mode);
    
      int32_t pse_type = 1;
      (void)tf_op.GetAttr("pse_type", pse_type);
      (void)npu_fa_op.SetAttr("pse_type", pse_type);
    
      // 3. 创建输入Data
      std::vector<Operator> inputs;
      for (size_t i = 0UL; i < tf_op.GetInputsSize(); i++) {
        const std::string data_name = "Data_" + std::to_string(i);
        Operator data_op = OperatorFactory::CreateOperator(data_name.c_str(), "Data");
        (void)data_op.SetAttr("index", static_cast<int32_t>(i));
        inputs.emplace_back(data_op);
      }
    
      size_t index = 0UL;
      //4. 必选输入直接设置Data到算子输入
      (void)npu_fa_op.SetInput("query", inputs[index++]);
      (void)npu_fa_op.SetInput("key", inputs[index++]);
      (void)npu_fa_op.SetInput("value", inputs[index++]);
    
      // 5. 可选输入需要判断type属性的个数是否为0,不为0则表示可选输入已经使能
      std::vector<DataType> real_shift_type;
      (void)tf_op.GetAttr("real_shift_type", real_shift_type);
      if (!real_shift_type.empty()) {
        (void)npu_fa_op.SetInput("real_shift", inputs[index++]);
      }
    
      std::vector<DataType> drop_mask_type;
      (void)tf_op.GetAttr("drop_mask_type", drop_mask_type);
      if (!drop_mask_type.empty()) {
        (void)npu_fa_op.SetInput("drop_mask", inputs[index++]);
      }
    
      std::vector<DataType> padding_mask_type;
      (void)tf_op.GetAttr("padding_mask_type", padding_mask_type);
      if (!padding_mask_type.empty()) {
        (void)npu_fa_op.SetInput("padding_mask", inputs[index++]);
      }
      std::vector<DataType> atten_mask_type;
      (void)tf_op.GetAttr("atten_mask_type", atten_mask_type);
      if (!atten_mask_type.empty()) {
        (void)npu_fa_op.SetInput("atten_mask", inputs[index++]);
      }
      std::vector<DataType> prefix_type;
      (void)tf_op.GetAttr("prefix_type", prefix_type);
      if (!prefix_type.empty()) {
        (void)npu_fa_op.SetInput("prefix", inputs[index++]);
      }
      std::vector<DataType> actual_seq_qlen_type;
      (void)tf_op.GetAttr("actual_seq_qlen_type", actual_seq_qlen_type);
      if (!actual_seq_qlen_type.empty()) {
        (void)npu_fa_op.SetInput("actual_seq_qlen", inputs[index++]);
      }
      std::vector<DataType> actual_seq_kvlen_type;
      (void)tf_op.GetAttr("actual_seq_kvlen_type", actual_seq_kvlen_type);
      if (!actual_seq_kvlen_type.empty()) {
        (void)npu_fa_op.SetInput("actual_seq_kvlen", inputs[index++]);
      }
    
      std::vector<DataType> q_start_idx_type;
      (void)tf_op.GetAttr("q_start_idx_type", q_start_idx_type);
      if (!q_start_idx_type.empty()) {
        (void)npu_fa_op.SetInput("q_start_idx", inputs[index++]);
      }
    
      std::vector<DataType> kv_start_idx_type;
      (void)tf_op.GetAttr("kv_start_idx_type", kv_start_idx_type);
      if (!kv_start_idx_type.empty()) {
        (void)npu_fa_op.SetInput("kv_start_idx", inputs[index++]);
      }
    
      // 6. 使用npu_fa_op算子的输出构造图的输出。
      std::vector<std::pair<Operator, std::vector<size_t>>> output_indexs;
      std::vector<size_t> node_output_index;
      for (size_t i = 0UL; i < npu_fa_op.GetOutputsSize(); i++) {
        node_output_index.emplace_back(i);
      }
      (void)output_indexs.emplace_back(std::make_pair(npu_fa_op, node_output_index));
      (void)graph.SetInputs(inputs).SetOutputs(output_indexs);
      return SUCCESS;
    }
    
    static Status FlashAttentionScoreMapping(const ge::Operator& op_src, ge::Operator& op_dst) {
      // 1. 调用默认映射函数即可
      if (AutoMappingByOpFn(op_src, op_dst) != ge::GRAPH_SUCCESS) {
        return FAILED;
      }
      // 2. 需要将TensorFlow算子名称设置到op_dst的original_type属性中,用于后续ParseOpToGraphFn回调函数的触发
      op_dst.SetAttr("original_type", "FlashAttentionScore");
      return SUCCESS;
    }
    
    REGISTER_CUSTOM_OP("FlashAttentionScore")
        .FrameworkType(TENSORFLOW)
        .OriginOpType({"FlashAttentionScore"})
        .ParseParamsByOperatorFn(FlashAttentionScoreMapping) // 注册此函数用于实现算子本身属性的映射
        .ParseOpToGraphFn(AddOptionalPlaceholderForFA); // 注册此函数用于实现将tf中的输入转化为可选输入,改变连边关系
    }  // namespace domi
  2. 在TensorFlow开源框架里注册FlashAttentionScore算子的原型定义,由于TensorFlow不支持可选输入,需要将其可选输入在TensorFlow原型中表示为动态输入,并通过属性来标记动态输入的个数,这些可选输入需要放置在原型定义的最后。示例代码(FlashAttentionScore.cc)如下:
    #include <algorithm>
    #include <atomic>
    #include <map> 
    #include "tensorflow/core/framework/common_shape_fns.h"
    #include "tensorflow/core/framework/op.h"
    #include "tensorflow/core/framework/op_kernel.h" 
    using namespace tensorflow;
    using shape_inference::InferenceContext;
    using shape_inference::ShapeHandle; 
    using namespace std;
    using namespace chrono; 
    using OpKernelConstructionPtr = OpKernelConstruction*;
    using OpKernelContextPtr = OpKernelContext*;
    using InferenceContextPtr = ::tensorflow::shape_inference::InferenceContext*; 
    namespace {
    class CustOps : public OpKernel {
    public:    
         explicit CustOps(OpKernelConstructionPtr context) : OpKernel(context) {}
         void Compute(OpKernelContextPtr context) override
        {
            std::cout << "Cust Ops not installed!!" << std::endl;
        }
         ~CustOps() override = default;};
    }  // namespace 
    namespace tensorflow {
    REGISTER_OP("FlashAttentionScore")
        .Input("query: T")
        .Input("key: T")
        .Input("value: T")
        .Input("real_shift: real_shift_type")  // 可选输入在TensorFlow原型中注册为动态输入
        .Input("drop_mask: drop_mask_type")
        .Input("padding_mask: padding_mask_type")
        .Input("atten_mask: atten_mask_type")
        .Input("prefix: prefix_type")
        .Input("actual_seq_qlen: actual_seq_qlen_type")
        .Input("actual_seq_kvlen: actual_seq_kvlen_type")
        .Input("q_start_idx: q_start_idx_type")
        .Input("kv_start_idx: kv_start_idx_type")
        .Output("softmax_max: float32")
        .Output("softmax_sum: float32")
        .Output("softmax_out: T")
        .Output("attention_out: T")
        .Attr("scale_value: float = 1.0")
        .Attr("keep_prob: float = 1.0")
        .Attr("pre_tockens: int = 2147483647")
        .Attr("next_tockens: int = 2147483647")
        .Attr("head_num: int")
        .Attr("input_layout: string")
        .Attr("inner_precise: int = 0")
        .Attr("sparse_mode: int = 0")
        .Attr("pse_type: int = 1")
        .Attr("T: {float16, float32, bfloat16} = DT_FLOAT")
        .Attr("real_shift_type: list({float16, float32, bfloat16}) >= 0") // 通过属性来标记动态输入个数
        .Attr("drop_mask_type: list({uint8}) >= 0")
        .Attr("padding_mask_type: list({float16, float32, bfloat16}) >= 0")
        .Attr("atten_mask_type: list({bool, uint8}) >= 0")
        .Attr("prefix_type: list({int64}) >= 0")
        .Attr("actual_seq_qlen_type: list({int64}) >= 0")
        .Attr("actual_seq_kvlen_type: list({int64}) >= 0")
        .Attr("q_start_idx_type: list({int64}) >= 0")
        .Attr("kv_start_idx_type: list({int64}) >= 0")
        .SetShapeFn([](InferenceContext *c) {
          return Status::OK();
        });
    REGISTER_KERNEL_BUILDER(Name("FlashAttentionScore").Device(DEVICE_CPU), CustOps)}
    使用如下命令对上述代码进行编译,产物为libcustom_ops.so,后续的算子调用脚本中可通过load_op_library接口加载该so为python模块,从而调用自定义算子。
    TF_CFLAGS=( $(python3 -c 'import tensorflow as tf; print(" ".join(tf.sysconfig.get_compile_flags()))') )     // 获取TensorFlow编译选项
    TF_LFLAGS=( $(python3 -c 'import tensorflow as tf; print(" ".join(tf.sysconfig.get_link_flags()))') )        // 获取TensorFlow链接选项
    SOURCE_FILES=FlashAttentionScore.cc                                                                          // 包含TensorFlow算子注册和CPU内核实现的cc文件
    g++ -std=c++14 -shared $SOURCE_FILES -o ${Path}/libcustom_ops.so -fPIC ${TF_CFLAGS[@]} ${TF_LFLAGS[@]} -O2   // 编译命令,产物为libcustom_ops.so,TensorFlow即可通过load_op_library加载该so为python模块,调用自定义算子
  3. 封装一个TensorFlow的算子调用接口,在此接口中处理可选输入。在该脚本需要加载上一步骤编译好的动态库。
    from tensorflow.python.framework import ops
    import tensorflow as tf
    tfOpLib = tf.load_op_library("../build/tf_ops/libflashattention.so")
    
    # 假如外部未使能该可选输入,则给底层传入空列表
    def create_optional_input_list(input):
        input_list = []
        if not input is None:
            input_list.append(input)
        return input_list
    
    # flash_attention_score 封装函数
    def npu_flash_attention(query, key, value, head_num, input_layout, real_shift=None, drop_mask=None, padding_mask=None,
                            atten_mask=None, prefix=None, actual_seq_qlen=None, actual_seq_kvlen=None,
                            q_start_idx=None, kv_start_idx=None, scale_value=1.0, keep_prob=1.0,
                            pre_tockens=2147483647, next_tockens=2147483647, inner_precise=0, sparse_mode=0,
                            pse_type=1):
        output = tfOpLib.flash_attention_score(query=query, key=key, value=value,
            real_shift=create_optional_input_list(real_shift), drop_mask=create_optional_input_list(drop_mask),
            padding_mask=create_optional_input_list(padding_mask), atten_mask=create_optional_input_list(atten_mask),
            prefix=create_optional_input_list(prefix), actual_seq_qlen=create_optional_input_list(actual_seq_qlen),
            actual_seq_kvlen=create_optional_input_list(actual_seq_kvlen), q_start_idx=create_optional_input_list(q_start_idx),
            kv_start_idx=create_optional_input_list(kv_start_idx), scale_value=scale_value, keep_prob=keep_prob,
            pre_tockens=pre_tockens, next_tockens=next_tockens, head_num=head_num, input_layout=input_layout,
            inner_precise=inner_precise, sparse_mode=sparse_mode, pse_type=pse_type)
        return output
  4. 测试脚本中实现自定义算子的调用。TensorFlow 2.6.5调用代码如下:
    import sys
    from ops import npu_flash_attention
    
    import tensorflow as tf
    import numpy as np
    tf.compat.v1.disable_eager_execution()
    
    import npu_device
    from npu_device.compat.v1.npu_init import *
    npu_device.compat.enable_v1()
    
    def sess_config():
        config = tf.compat.v1.ConfigProto()
        custom_op = config.graph_options.rewrite_options.custom_optimizers.add()
        custom_op.name = "NpuOptimizer"
        config.graph_options.rewrite_options.remapping = RewriterConfig.OFF
        config.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF
        return config
    
    shape = [1, 32, 32]
    query_np = np.random.randn(*shape).astype(np.float16)
    key_np = np.random.randn(*shape).astype(np.float16)
    value_np = np.random.randn(*shape).astype(np.float16)
    
    query = tf.Variable(query_np, tf.float16)
    key = tf.Variable(key_np, tf.float16)
    value = tf.Variable(value_np, tf.float16)
    
    mask = tf.zeros(shape=(shape[0], 1, shape[1], shape[1]), dtype=tf.uint8)
    
    head_num = 1
    input_layout = "BSH"
    flash_result_t = npu_flash_attention(query, key, value, head_num, input_layout, atten_mask=mask)
    
    with tf.compat.v1.Session(config=sess_config()) as sess:
        sess.run(tf.compat.v1.global_variables_initializer())
        flash_result = sess.run(flash_result_t)
        print(flash_result)

动态输入算子映射关系开发

对于存在动态输入/输出的算子,需要在插件的回调函数ParseParamByOpFunc中使用AutoMappingByOpFnDynamic实现TensorFlow算子和CANN算子的匹配。通过DynamicInputOutputInfo结构类描述动态输入/输出的信息,将动态输入/输出的名称和描述其个数的属性名绑定,再传入AutoMappingByOpFnDynamic实现自动匹配。

以ParseSingleExample算子为例,插件适配代码如下:
#include "register/register.h"
namespace domi {
Status ParseSingleExampleMapping(const ge::Operator& op_src, ge::Operator& op) {
  std::vector<DynamicInputOutputInfo> value;
  const std::string dynamic_input_name_dense_defaults = "dense_defaults";
  const std::string dynamic_input_attr_name_dense_defaults = "Tdense";
  DynamicInputOutputInfo input(kInput, dynamic_input_name_dense_defaults.c_str(),
      dynamic_input_name_dense_defaults.size(), dynamic_input_attr_name_dense_defaults.c_str(),
      dynamic_input_attr_name_dense_defaults.size());
  value.push_back(input);
  const std::string dynamic_output_name_sparse_indices = "sparse_indices";
  const std::string dynamic_output_attr_name_sparse_indices = "num_sparse";
  DynamicInputOutputInfo output(kOutput, 
      dynamic_output_name_sparse_indices.c_str(),
      dynamic_output_name_sparse_indices.size(), dynamic_output_attr_name_sparse_indices.c_str(),
      dynamic_output_attr_name_sparse_indices.size());
  value.push_back(output);
  const std::string dynamic_output_name_sparse_values = "sparse_values";
  const std::string dynamic_output_attr_name_sparse_values = "sparse_types";
  DynamicInputOutputInfo output1(kOutput, 
      dynamic_output_name_sparse_values .c_str(),
      dynamic_output_name_sparse_values .size(), dynamic_output_attr_name_sparse_values.c_str(),
      dynamic_output_attr_name_sparse_values.size());
  value.push_back(output1);
  const std::string dynamic_output_name_sparse_shapes = "sparse_shapes";
  const std::string dynamic_output_attr_name_sparse_shapes = "sparse_types";
  DynamicInputOutputInfo output1(kOutput, 
      dynamic_output_name_sparse_shapes.c_str(),
      dynamic_output_name_sparse_shapes.size(), dynamic_output_attr_name_sparse_shapes.c_str(),
      dynamic_output_attr_name_sparse_shapes.size());
  value.push_back(output2);
  const std::string dynamic_output_name_dense_values = "dense_values";
  const std::string dynamic_output_attr_name_dense_values = "Tdense";
  DynamicInputOutputInfo output1(kOutput, 
      dynamic_output_name_dense_values .c_str(),
      dynamic_output_name_dense_values .size(), dynamic_output_attr_name_dense_values.c_str(),
      dynamic_output_attr_name_dense_values.size());
  value.push_back(output3);
  AutoMappingByOpFnDynamic(op_src, op, value);
  return SUCCESS;
}

// register ParseSingleExample op to GE
REGISTER_CUSTOM_OP("ParseSingleExample")
    .FrameworkType(TENSORFLOW)
    .OriginOpType("ParseSingleExample")
    .ParseParamsByOperatorFn(ParseSingleExampleMapping)
    }

暂不支持同时有可选输入和动态输入的算子映射。