昇腾社区首页
中文
注册

单进程多卡异步图运行

本章节介绍如何使用单进程管理多卡,即一个进程可以并发在不同的Device上运行。

功能介绍

Atlas 200I/500 A2 推理产品 不支持该特性

涉及的主要接口为:

  1. 调用GEInitialize进行系统初始化(也可在Graph构建前调用),申请系统资源。
  2. 调用aclInit接口,初始化acl
  3. 调用Session构造函数创建多个Session类对象,申请Session资源,每个Session传入不同的ge.session_device_id,将模型运行在不同的Device。
  4. 创建多个线程,每个线程传入不同的Session,下面以一个线程为例,描述简单的流程:
    1. 调用aclrtSetDevice指定运行的Device,调用aclrtCreateStream创建Stream,然后调用aclrtMalloc申请Device内存。
    2. 调用AddGraph在Session类对象中添加定义好的图。
    3. 调用CompileGraph完成图编译。
    4. 调用LoadGraph(异步执行Graph场景),加载图模型到4.a创建的Stream上。
    5. 调用aclrtMemcpy将数据从Host传到Device。
    6. 调用ExecuteGraphWithStreamAsync异步执行接口,运行Graph。
    7. 调用aclrtSynchronizeStream阻塞程序运行,直到指定Stream中的所有任务都完成。
    8. 调用aclrtMemcpy将数据从Device回传到Host。
    9. 调用aclrtFree释放内存。
  5. 调用GEFinalize,释放系统资源;调用aclFinalize释放acl相关资源。

开发示例

  1. 包含的头文件,包括acl、C或C++标准库的头文件。
    1
    2
    3
    4
    5
    #include "ge_api.h"
    #include "acl.h"
    #include "acl_rt.h"
    #include "graph/ascend_string.h"
    #include <thread>
    
  2. 申请系统资源。

    Graph定义完成后,调用GEInitialize进行系统初始化(也可在Graph定义前调用),申请系统资源。示例代码如下:

    1
    2
    3
    std::map<AscendString, AscendString>config = {{"ge.exec.deviceId", "0"},
                                                  {"ge.graphRunMode", "1"}};
    Status ret = ge::GEInitialize(config);
    

    可以通过config配置传入ge运行的初始化信息,配置参数ge.exec.deviceId和ge.graphRunMode,分别用于指定GE实例运行设备,图执行模式(在线推理请配置为0,训练请配置为1)。更多配置请参考options参数说明

    GE options中的dump信息,与后续调用acl初始化接口时配置的dump信息,建议两者不要同时配置,否则可能导致异常。其他相同功能的参数类似。

  3. acl资源初始化。
    1
    2
    3
    4
    5
    6
    7
        std::string aclConfigPath = "xx/xx/xx";
        aclError retInit = aclInit(aclConfigPath);
        if (retInit != ACL_ERROR_NONE) {
            // ...
            // ...
            return FAILED;
        }
    
  4. 创建多个Session。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
        int thread_num = 8; // 以8个device为例 
        for (int i= 0; i < thread_num; ++i) {   //创建多个Sessio,每个Sessio的options中,传入不同的ge.session_device_id
            std::map<ge::AscendString, ge::AscendString> options = {
    	    {"ge.session_device_id",std::to_string(i).c_str()},
    	};		
    	ge::Session *session = new ge::Session(options);
    	if (session == nullptr) {
    	    std::cout << "create session failed!" << std::endl;
    	    ge::GEFinalize();
    	    return FAILED;
    	}
            sessions.push_back(session);
        }
    
  5. 创建多个线程,每个线程传入不同的Sessio和ge.session_device_id,进行异步运行Graph。
    1
    2
    3
    4
    5
    6
    7
    8
        std::vector<std::thread> threads;
        for (int i= 0; i < thread_num; i++) {
            std::thread worker_thread(exec_func, i); // exec_func线程函数如5.a~5.g步骤所示
            threads.emplace_back(std::move(worker_thread));
        }
        for (int i = 0; i < thread_num; i++) {
            threads.at(i).join();
        }
    

    单个线程exec_func异步运行步骤如下:

    1. 指定运行的Device,创建Stream,申请内存。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      // 指定用于运算的Device
      int32_t deviceId = 0;
          retInit = aclrtSetDevice(deviceId);
      
      // 创建一个Stream
          aclrtStream stream = nullptr;
          aclError aclRet = aclrtCreateStreamWithConfig(&stream_, 0, ACL_STREAM_FAST_LAUNCH);
      
      // 申请Device上的内存
          void* devPtrB = NULL;
          aclRet = aclrtMalloc(&devPtrB, data_size, ACL_MEM_MALLOC_HUGE_FIRST);
      
    2. 添加Graph对象。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      uint32_t graph_id = 0;
      ge::Graph graph;
      sess_ = sessionList[index];
      ge::Status ret = sess_ -> AddGraph(graph_id, graph, graph_options);
      if(ret != SUCCESS) {
        // ...
        // ...
        // 释放资源
        ge::GEFinalize();
        delete session;
        return FAILED;
      }
      

      用户可以通过传入options配置图运行相关配置信息,相关配置请参考Session构造函数。其中图运行完之后的数据保存在Tensor output_cov中。

    3. 编译Graph。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      uint32_t graph_id = 0;
      ret = sess_ -> CompileGraph(graph_id);
      if(ret != SUCCESS) {
        // ...
        // ...
        // 释放资源
        ge::GEFinalize();
        delete session;
        return FAILED;
      }
      
    4. 加载Graph到创建的Stream上。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      std::map <AscendString, AscendString> options;
      uint32_t graph_id = 0;
      ret = sess_ -> LoadGraph(graph_id, options, stream_);
      if(ret != SUCCESS) {
        // ...
        // ...
        // 释放资源
        ge::GEFinalize();
        delete session;
        return FAILED;
      }
      
    5. 数据传输。
      1
      2
      3
      // 内存复制,将Host上数据传输到Device
      // hostPtrA表示Host上源内存地址指针,devPtrB表示Device上目的内存地址指针,size表示内存大小
          aclrtMemcpy(devPtrB, size, hostPtrA, size, ACL_MEMCPY_HOST_TO_DEVICE);
      
    6. 异步运行Graph,输出执行结果。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      std::vector<gert::Tensor> input;
      std::vector<gert::Tensor> output;
      ret = sess_->ExecuteGraphWithStreamAsync(graph_id, stream, input, output);
      
      // 调用aclrtSynchronizeStream接口,阻塞应用程序运行,直到指定Stream中的所有任务都完成
          aclRet = aclrtSynchronizeStream(stream);
      // 内存复制,将Device数据传回Host
      // devPtrA表示Device上源内存地址指针,hostPtrB表示Host上目的内存地址指针,size表示内存大小
          aclrtMemcpy(hostPtrB, size, devPtrA, size, ACL_MEMCPY_DEVICE_TO_HOST);
      
    7. 释放内存。
      1
      2
      // 释放内存
          ret = aclrtFree(devPtrB);
      
  6. 释放资源。
    1
    2
    3
    4
    5
    6
    7
    8
    // 释放各个Session资源
        for (auto session : sessions) {
            delete session;        
        }
    // 释放Graph资源
        ret = ge::GEFinalize();
    // acl去初始化
        ret = aclFinalize();