KV Cache管理
功能介绍
KV Cache管理涉及的主要接口及功能如下:
接口名称 |
功能 |
---|---|
register_cache |
注册cache。 非PA场景下,调用此接口注册一个自行申请的内存。 |
register_blocks_cache |
注册cache。 PA场景下,调用此接口注册一个自行申请的内存。 |
unregister_cache |
当cache不再使用时,调用当前接口对注册过的cache进行解注册。 |
pull_cache |
根据CacheKey,从远端节点拉取KV到本地KV Cache。 该CacheKey需要和allocate_cache的CacheKey保持一致。 |
pull_blocks |
PA场景下,KV的拉取接口。和pull_cache的差异是,pull_blocks是按照block_index拉取的对应位置的KV Cache。 |
transfer_cache_async |
异步分层传输Cache。 |
push_blocks |
PA场景下,KV的推送接口,从本地节点推送KV到远端KV Cache。 |
push_cache |
非PA场景下,从本地节点推送KV到远端KV Cache。 |
使用场景
主要用于分布式集群间的KV Cache传输。
功能示例(一般Cache传输场景)
本示例介绍一般Cache传输场景下接口的使用,主要涉及KV Cache的注册、解注册、传输。如下将根据业务角色给出伪代码示例。
- P侧和D侧根据链路管理章节的示例完成LLMDataDist的初始化。
- 在P侧和D侧模型的每层按照计算好大小提前申请KV Cache。不同请求对创建的KV Cache进行复用,上层框架自行管理,业务结束后释放申请的内存。
1 2 3 4 5 6 7 8 9 10 11 12
import torchair import torch import torch_npu # 从初始化完的llm_datadist中获取cache_manager cache_manager = llm_datadist.cache_manager # 根据模型中KV Cache的shape以及总个数创建CacheDesc。此处shape只是示例,实际填写网络中的KV cache shape。 cache_desc = CacheDesc(num_tensors=4, shape=[4, 4, 8], data_type=DataType.DT_FLOAT16) tensor1 = torch.full((4, 4, 8), 1, dtype=torch.float).npu() ... # 其他tensor申请 cache = cache_manager.register_cache(cache_desc, [int(tensor.data_ptr()), int(tensor2.data_ptr()) ...]) # 建链后将注册的kv_tensors传给模型推理计算产生KV Cache,将模型输出传输给增量推理模型作为输入
- P侧和D侧根据链路管理章节的示例完成LLMDataDist间的建链。
- 将KV Cache从P侧传输到D侧,有以下两种方式:
- 在D侧调用pull_cache接口拉取对应请求的KV Cache到申请的内存中。
1 2 3
# 创建和P侧申请cache时相同的cache_key,用于拉取对应的KV cache cache_key = CacheKey(prompt_cluster_id=0, req_id=1, model_id=0) cache_manager.pull_cache(cache_key, kv_cache, batch_index=1) # 拉到batch index为1的位置上
- 在P侧调用transfer_cache_async接口将数据传输至Decode。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
from llm_datadist import LayerSynchronizer, TransferConfig class LayerSynchronizerImpl(LayerSynchronizer): def __init__(self, events): self._events = events def synchronize_layer(self, layer_index: int, timeout_in_millis: Optional[int]) -> bool: self._events[layer_index].wait() return True events = [torch.npu.Event() for _ in range(cache_desc.num_tensors // 2)] # 执行模型,模型在各层计算完成后调用events[layer_index].record()记录完成状态 # 模型执行由用户实现 # user_model.Predict(kv_tensors, events) # 模型下发完成后,调用transfer_cache_async传输数据,此处需要填写Decode已申请的KV Cache各层tensor的内存地址 transfer_config = TransferConfig(DECODER_CLUSTER_ID, decoder_kv_cache_addrs) cache_task = cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(events), [transfer_config]) # 同步等待传输结果 cache_task.synchronize()
- 在D侧调用pull_cache接口拉取对应请求的KV Cache到申请的内存中。
- 以torch为例,将KV Cache转换为torch tensor,进行增量模型推理。
1 2 3 4 5 6 7 8 9 10 11 12
# 将申请好的KV Cache转换为框架中的KV Cache类型,不同框架中都需要提供根据KV Cache地址创建对应类型的KV Cache的接口。此处以PyTorch为例 # 转换操作和pull操作顺序不分先后 kv_tensor_addrs = kv_cache.per_device_tensor_addrs[0] kv_tensors = torchair.llm_datadist.create_npu_tensors(kv_cache.cache_desc.shape, torch.float16, kv_tensor_addrs) # 将转换后的tensor拆分为框架需要的KV配对方式,可以自定义组合KV mid = len(kv_tensors) // 2 k_tensors = kv_tensors[: mid] v_tensors = kv_tensors[mid:] kv_cache_tensors = list(zip(k_tensors, v_tensors)) # 将转换的kv_tensors传给模型进行迭代推理 # 等待请求增量推理完成
- 根据业务中cache的使用时机自行解注册对应请求的KV Cache内存。
1
cache_manager.unregister_cache(cache_id)
- 业务退出时,P侧和D侧根据链路管理章节的示例进行断链,然后调用finalize接口释放资源。
功能示例(Blocks Cache传输场景)
本示例介绍Blocks Cache(将Cache使用块状形式管理)传输场景下接口的使用,主要涉及KV Cache的注册、解注册、传输。如下将根据业务角色给出伪代码示例。
- P侧和D侧根据链路管理的示例完成LLMDataDist的初始化。
- 在P侧和D侧模型的每层按照计算好的num_blocks数量调用申请tensor,比如torch tensor,并注册给LLMDatadist。Blocks Cache场景下,不同请求对创建的num_blocks大小的KV Cache进行复用,上层框架自行管理,业务结束后释放申请的内存。
1 2 3 4 5 6 7 8 9 10 11 12
# P/D侧 # 从已经初始化的llm_datadist中获取kv_cache_manager cache_manager = llm_datadist.cache_manager # 根据模型中KV Cache的shape以及总个数创建CacheDesc。PA场景的KV cache shape通常为[num_blocks, block_size,...,...] num_blocks = 10 block_mem_size = 128 cache_desc = CacheDesc(num_tensors=4, shape=[num_blocks, block_mem_size], data_type=DataType.DT_FLOAT16) # 根据初始化llm_datadist时的cluster_id创建对应请求的BlocksCacheKey cache_key = BlocksCacheKey(prompt_cluster_id=0, model_id=0) ... # 申请tensor # 调用register_blocks_cache接口注册KV Cache内存 kv_cache = cache_manager.register_blocks_cache(cache_desc, [addr, addr2, ...(tensor地址)], cache_key)
- P侧和D侧根据链路管理章节的示例完成LLMDataDist间的建链。
- P侧有新请求进来后,推理框架会给每个请求分配好对应的block_index。模型推理完之后,该请求对应的KV Cache就在对应的block_index所在的内存上,将模型输出和请求对应的block_table传输给D侧推理模型作为输入。
- D侧有新请求进来后,推理框架也会给每个请求分配好对应的block_index,然后调用pull_blocks接口,根据P侧的block_index和D侧的block_index的对应关系,将KV Cache传输到指定位置。此时有两种方式:
- 在D侧调用pull_blocks接口拉取KV Cache。
1 2 3 4
# D侧根据P侧传过来的信息,添加新请求,并申请对应的block_table # D侧根据传过来请求的src_block_table和新申请的dst_block_table拉取KV到对应block cache_key = BlocksCacheKey(prompt_cluster_id=0, model_id=0) # P侧register_blocks_cache时的入参 cache_manager.pull_blocks(cache_key, cache, [0, 1], [2, 3]) # 将P侧0, 1 block位置上的数据拉到D侧2, 3 block位置上
- 在P侧调用transfer_cache_async接口时将数据传输至D侧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# 实现LayerSynchronizerImpl,通过torch Event获取各层计算结束状态,本例中通过Event机制实现 class LayerSynchronizerImpl(LayerSynchronizer): def __init__(self, events): self._events = events def synchronize_layer(self, layer_index: int, timeout_in_millis: Optional[int]) -> bool: self._events[layer_index].wait() return True events = [torch.npu.Event() for _ in range(cache_desc.num_tensors // 2)] # 执行模型, 模型在各层计算完成后调用events[layer_index].record()记录完成状态 # 该函数由用户实现 user_model.Predict(kv_cache_tensors, events) # 模型下发完成后,调用transfer_cache_async传输数据,此处需要填写Decode已申请的KV Cache各层tensor的内存地址 transfer_config = TransferConfig(DECODER_CLUSTER_ID, decoder_kv_cache_addrs) cache_task = cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(events), [transfer_config], [0, 1], [2, 3]) # 同步等待传输结果 cache_task.synchronize()
- 在D侧调用pull_blocks接口拉取KV Cache。
- 业务结束后P侧和D侧调用unregister_cache对注册的KV Cache内存进行解注册。
1 2 3
# 等待D侧拉取完对应请求的KV Cache # 根据业务中cache的使用时机自行解注册对应请求的KV Cache。 cache_manager.unregister_cache(cache_id)
- 业务退出时,P侧和D侧根据集群断链的示例进行断链,然后调用finalize接口释放资源。
异常处理
- 错误码LLM_KV_CACHE_NOT_EXIST表示对端KV Cache不存在,需要检查对端进程是否异常或者对应KV Cache的请求有没有推理完成。该错误不影响其他请求流程,确认流程后可以重试。
- 错误码LLM_WAIT_PROCESS_TIMEOUT表示pull KV超时,说明链路出现问题,需要重新断链建链再尝试。
父主题: 功能介绍