昇腾社区首页
中文
注册

cache管理

功能介绍

本章节按照PA场景和非PA场景介绍对应的功能,每个场景涉及D2D,D2H,H2D的传输方式。每种传输方式同时会有使用用户注册内存和llm_datadist管理内存的方式。

相比仅支持D2D传输的接口,主要额外新增以下接口功能。

  • CacheDesc新增placement入参。表示cache所在的设备。
  • cache_manager.register_cache:将用户的连续cache内存地址注册给llm_datadist。
  • cache_manager.register_blocks_cache:将用户的block的cache内存地址注册给llm_datadist。

使用场景

主要用于分布式集群间的KV Cache传输和搬移。

功能示例

本示例主要分PA场景和非PA场景介绍下接口的使用,主要涉及KV Cache的申请、释放、传输、搬移,会根据业务角色给出伪代码示例,具体接口功能和接口参数可参考LLM DataDist接口参考(Python)

PA场景主要以用户注册内存的流程给出示例,非PA场景主要以llm_datadist管理内存的流程给出示例。

示例以P侧作为数据发送端,D侧作为数据接收端。实际业务场景可自行调整。

PA场景

推荐使用用户注册内存方式,因为PA场景管理了一块num_blocks大小的内存,这块内存的使用由框架管理,通过用户注册内存的方式更方便。也可以使用llm_datadist管理cache的接口,示例参考非PA场景功能示例,更换接口为

  • cachemanager.allocate_cache更换为cachemanager.allocate_blocks_cache
  • cachemanager.pull_cache更换为cachemanager.pull_blocks

如果业务中使用了swap_blocks,copy_blocks功能,则在使用位置用llm_datadist提供的功能替换。

以D2D传输示例,D2H和H2D对应的更改收发端的placement参数:

  1. P侧参考注册用户内存中的示例进行llm_datadist初始化,在注册用户内存时修改CacheDesc。
    1
    2
    3
    4
    5
    6
    7
     # 将torch的kv tensor内存注册给llm_datadist
     cache_manager = llm_datadist.cache_manager
     cache_desc = CacheDesc(num_tensors=1, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT, placement=Placement.DEVICE)
     tensor = torch.ones(2, 1024*1024, dtype=torch.float).npu()
     addr =int(tensor.data_ptr())
     # block_cache_key作为可选参数,在数据接收端注册的时候可以不用配置。根据实际业务自行选择register_blocks_cache或者register_cache
     cache = cache_manager.register_blocks_cache(cache_desc, [addr], BlocksCacheKey(1, 0))
    
  2. D侧参考注册用户内存中的示例进行llm_datadist初始化,在注册用户内存时修改CacheDesc。
    1
    2
    3
    4
    5
    6
    7
     # 将torch的kv tensor内存注册给llm_datadist
     cache_manager = llm_datadist.cache_manager
     cache_desc = CacheDesc(num_tensors=1, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT, placement=Placement.DEVICE)
     tensor = torch.ones(2, 1024*1024, dtype=torch.float).npu()
     addr =int(tensor.data_ptr())
     # block_cache_key作为可选参数,在数据接收端注册的时候可以不用配置。根据实际业务自行选择register_blocks_cache或者register_cache
     cache = cache_manager.register_blocks_cache(cache_desc, [addr])
    
  3. P侧和D侧参考注册用户内存中查询内存状态示例,等待内存注册完成。
  4. P侧执行业务推理,等待D侧拉取kv完成。D侧等待P侧KV Cache计算完成,调用pull接口拉KV Cache,执行推理业务。
    1
    2
    3
    4
    5
     # D侧脚本
     # 等待P侧KV Cache计算完成,调用pull_blocks接口拉取cache
     cache_manager = llm_datadist.cache_manager
     cache_manager.pull_blocks(BlocksCacheKey(1, 0), cache, src_blocks=[0, 1], dst_blocks=[0, 1])
     # ...执行推理业务
    
  5. 参考集群建链示例进行断链以及llm_datadist资源释放。

非PA场景

推荐使用llm_datadist管理内存的方式。因为非PA场景每个请求都需要申请一块cache内存,通过llm_datadist预申请内存池管理的方式可以节省申请内存的耗时。也可以使用注册用户内存方式,示例参考PA场景示例。更换接口为

  • cachemanager.register_blocks_cache更换为cachemanager.register_cache
  • cachemanager.pull_blocks更换为cachemanager.pull_cache

D2D传输示例,D2H和H2D对应的更改收发端的placement参数:

  1. 参考建链流程的非PA场景示例,初始化llm_datadist,link,等待内存注册完成。
  2. P侧和D侧根据框架中的KV Cache大小,从llm_datadist中申请对应的KV Cache内存,并转换为框架KV Cache类型。P侧使用转换后的KV Cache执行业务,等待KV Cache被pull。D侧等待P侧KV Cache计算完成调用pull拉取kv,进行推理业务。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
     # P侧脚本
     cache_manager = llm_datadist.cache_manager
     cache_desc = CacheDesc(num_tensors=4, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT16, placement=Placement.DEVICE)
     cache_key_0 = CacheKey(prompt_cluster_id=1, req_id=0, model_id=0)
     cache_key_1 = CacheKey(prompt_cluster_id=1, req_id=1, model_id=0)
     cache = cache_manager.allocate_cache(cache_desc, [cache_key_0, cache_key_1])
     # 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以torch中转换的接口为例
     tensor_addrs = cache.tensor_addrs
     tensors = torchair.llm_datadist.create_npu_tensors(cache.cache_desc.shape, torch.float16, tensor_addrs)
     # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv
    
     # ...执行业务推理
     # 等待D侧完成pull
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
     # D侧脚本
     cache_manager = llm_datadist.cache_manager
     cache_desc = CacheDesc(num_tensors=1, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT16, placement=Placement.DEVICE)
     cache = cache_manager.allocate_cache(cache_desc)
    
     # 等待P侧cache计算完成
     # 调用pull_cache拉取cache,拉取的cache_key和P侧对应
     cache_key_0 = CacheKey(prompt_cluster_id=1, req_id=0, model_id=0)
     cache_manager.pull_cache(cache_key_0, cache, batch_index=0)
     # 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以torch中转换的接口为例
     tensor_addrs = cache.tensor_addrs
     tensors = torchair.llm_datadist.create_npu_tensors(cache.cache_desc.shape, torch.float16, tensor_addrs)
     # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv
    
     # ...执行推理业务
    
  3. 根据业务中cache的使用时机自行释放请求申请的cache和cache_key。
    1
    2
    3
    4
    5
     # P侧脚本
     # 当batch中所有请求的KV Cache都被拉走后,调用deallocate_cache才会真正释放内存。如果D侧未拉走KV Cache,则还需要调用remove_cache_key。
     cache_manager.remove_cache_key(cache_key_0)
     cache_manager.remove_cache_key(cache_key_1)
     cache_manager.deallocate_cache(cache)
    
    1
    2
    3
     # D侧脚本
     # 根据业务中cache的使用时机自行释放cache。申请时未设置cacke_key,所以不需要释放
     cache_manager.deallocate_cache(cache)
    
  4. 参考建链流程进行断链以及llm_datadist资源释放。

非PA场景还支持连续往非连续数据发送,常用于将连续的KV Cache内存发送到非连续host的内存上方便管理。当然也支持D2D场景

连续往非连续发送需要保证连续内存除以block_mem_size向上取整后要等于dst_block的个数。

  1. P侧参考建链流程的非PA场景示例,D侧链流程的PA场景示例,初始化llm_datadist,link,等待内存注册完成。
  2. P侧根据框架中的KV Cache大小,从llm_datadist中申请对应的KV Cache内存,并转换为框架KV Cache类型。D侧参考注册用户内存中的示例在注册用户内存时修改CacheDesc,等待内存注册完成。
    1
    2
    3
    4
    5
    6
    7
    8
    9
     # P侧脚本
     cache_manager = llm_datadist.cache_manager
     cache_desc = CacheDesc(num_tensors=4, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT16, placement=Placement.DEVICE)
     cache_key = CacheKey(prompt_cluster_id=1, req_id=0, model_id=0)
     cache = cache_manager.allocate_cache(cache_desc, [cache_key])
     # 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以torch中转换的接口为例
     tensor_addrs = cache.tensor_addrs
     tensors = torchair.llm_datadist.create_npu_tensors(cache.cache_desc.shape, torch.float16, tensor_addrs)
     # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
     # D侧脚本
     # 将torch的kv tensor内存注册给llm_datadist
     cache_manager = llm_datadist.cache_manager
     cache_desc = CacheDesc(num_tensors=1, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT, placement=Placement.HOST)
     tensor = torch.ones(2, 1024*1024, dtype=torch.float).npu()
     addr =int(tensor.data_ptr())
     # block_cache_key作为可选参数,在数据接收端注册的时候可以不用配置。根据实际业务自行选择register_blocks_cache或者register_cache
     cache = cache_manager.register_blocks_cache(cache_desc, [addr])
     # 按照注册用户内存章节示例执行到查询内存状态,等待查询完成
    
  3. P侧执行业务推理,等待KV Cache被D侧拉走。D侧等待KV Cache计算完成,调用pull拉取kv,执行推理业务。
    1
    2
    3
    4
    5
    6
     # D侧脚本
     # 调用pull_blocks接口拉取cache
     cache_manager = llm_datadist.cache_manager
     cache_key_0 = CacheKey(prompt_cluster_id=1, req_id=0, model_id=0)
     cache_manager.pull_blocks(cache_key_0, cache, src_blocks=[], dst_blocks=[0])
     # ...执行推理业务
    
  4. P侧根据业务中cache的使用时机自行释放请求申请的cache和cache_key。D侧由于是注册的用户内存,不需要释放。
    1
    2
    3
    4
    5
     # P侧脚本
     # 当batch中所有请求的KV Cache都被拉走后,调用deallocate_cache才会真正释放内存。如果D侧未拉走KV Cache,则还需要调用remove_cache_key。
     cache_manager.remove_cache_key(cache_key_0)
     cache_manager.remove_cache_key(cache_key_1)
     cache_manager.deallocate_cache(cache)
    
  5. 参考建链流程示例进行断链以及llm_datadist资源释放。

异常处理

出现LLM_KV_CACHE_NOT_EXIST时,表示对端KV Cache不存在,需要检查对端进程是否异常或者对应KV Cache的请求有没有推理完成。该错误不影响其他请求流程,确认流程后可以重试。

出现LLM_TIMEOUT时,表示pull kv超时,说明链路出现问题,需要重新断链建链再尝试。

更多异常处理请参考LLM DataDist接口参考(Python)中的"LLMStatusCode"章节。