KV Cache管理
功能介绍
在llm_datadist初始化时会预申请一块指定大小的内存池。后续的KV Cache的内存申请及释放都在预申请的内存上进行,相比每次申请一块内存,可以节省耗时。涉及的主要接口及功能如下。
接口名称 |
功能 |
---|---|
allocate_cache |
分配cache。 cache分配成功后,会同时被cache_id与cache_keys引用,只有当这些引用都解除后,cache所占用的资源才会实际释放。cache_keys会在LLMRole为Prompt时传入,在LLMRole为Decoder时pull_cache时传入对应的cache_key,用于查找对应KV Cache。 |
deallocate_cache |
释放cache。 如果该cache在allocate_cache时关联了cache_keys,则实际的释放会延后到所有的cache_keys释放,cache_keys的引用则可以通过以下2种方式解除:
|
remove_cache_key |
移除cache_key。 仅当LLMRole为Prompt时可调用,Decoder时因为allocate_cache申请时不需要传入cache_key。移除cache_key后,该cache将无法再被pull_cache拉取。 |
pull_cache |
由Decoder节点发起,传入P侧allocate_cache时对应的cache_keys从Prefill节点拉取对应的kv。 |
copy_cache |
拷贝KV Cache。 当期望pull_cache和其他使用KV Cache的操作流水时,可以额外申请一块中转cache。当其他流程在操使用KV Cache时,可以先将下一次的kv pull到中转cache,待其他流程使用完KV Cache后,copy到指定的位置,从而将pull_cache的耗时隐藏。 公共前缀场景在新请求推理前可以将公共前缀拷贝到新的内存中和当前请求的kv合并推理 |
allocate_blocks_cache |
PA场景下,kv的申请接口。 |
pull_blocks |
PA场景下,kv的拉取接口。和pull_cache的差异是,pull_blocks是按照block_index拉取的对应位置的KV Cache |
copy_blocks |
PA场景下,kv的copy接口;和copy_cache的差异是,copy_blocks是按照block_index拷贝的对应位置的KV Cache。在使用场景上也存在差异,copy_blocks主要是针对当多个回答需要共用相同block,block没填满时,新增的token需要拷贝到新的block上继续迭代。 |
swap_blocks |
PA场景下,对应block_index上kv内存的换入换出,主要面向用户需要自行管理kv内存的场景。 |
transfer_cache_async |
由Prompt节点发起,将cache的数据传输到Decoder。 |
使用场景
主要用于分布式集群间的KV Cache传输和搬移。
功能示例
本示例分别介绍PA场景和非PA场景下接口的使用,主要涉及KV Cache的申请、释放、传输、搬移。
如下将根据业务角色给出伪代码示例,接口参数可参考《LLM DataDist接口参考(Python)》。
非PA场景:
在业务中,KV Cache涉及的流程是:
- P侧和D侧根据集群建链的示例完成llm_datadist的初始化和建链操作。
- 在P侧给每个请求申请对应大小的KV Cache内存,并转化为框架中所需要的KV Cache类型,不同框架需要提供根据地址创建对应类型的KV Cache的接口,模型推理一次后会计算产生该请求的KV Cache。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# 从初始化完的llm_datadist中取到kv_cache_manager kv_cache_manager = llm_datadist.kv_cache_manager # 根据模型中KV Cache的shape以及总的个数创建CacheDesc cache_desc = CacheDesc(num_tensors=4, shape=[4, 4, 8], data_type=DataType.DT_FLOAT16) # 根据初始化llm_datadist时的cluster_id创建对应请求的cache_key,当P侧是多batch模型时,需要创建batch数量的cache_key,当batch未占满时,则需要插入特殊的cache_key(将req_id设置为UINT64_MAX)占位,如果空闲的batch_index在末尾,则可以省略。 cache_key_0 = cache_key(prompt_cluster_id=0, req_id=0, model_id=0) cache_key_padding = cache_key(prompt_cluster_id=0, req_id=PADDING_REQ_ID, model_id=0) cache_key_2 = cache_key(prompt_cluster_id=0, req_id=1, model_id=0) # 调用allocate_cache接口申请好对应请求的KV Cache内存 kv_cache = kv_cache_manager.allocate_cache(cache_desc, [cache_key_0, cache_key_padding, cache_key_2]) # 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以PyTorch中转换的接口为例 kv_tensor_addrs = kv_cache.per_device_tensor_addrs[0] kv_tensors = torchair.llm_datadist.create_npu_tensors(kv_cache.cache_desc.shape, torch.float16, kv_tensor_addrs) # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv mid =len(kv_tensors) //2 k_tensors = kv_tensors[: mid] v_tensors = kv_tensors[mid:] kv_cache_tensors =list(zip(k_tensors, v_tensors)) # 将转换的kv_tensors传给模型推理计算产生KV Cache,将模型输出传输给增量推理模型作为输入 # 等待增量拉取完对应请求的KV Cache...
- 在D侧申请一份用于模型执行的KV Cache内存,并转化为框架中所需要的KV Cache类型,不同框架需要提供根据地址创建对应类型的KV Cache的接口。
1 2 3 4 5 6
# 从初始化完的llm_datadist中取到kv_cache_manager kv_cache_manager = llm_datadist.kv_cache_manager # 根据模型中KV Cache的shape以及总的个数创建CacheDesc cache_desc = CacheDesc(num_tensors=4, shape=[4, 4, 8], data_type=DataType.DT_FLOAT16) # 调用allocate_cache接口申请好对应请求的KV Cache内存 kv_cache = kv_cache_manager.allocate_cache(cache_desc)
- 将KV Cache从Prompt传输到Decoder,有以下两种方式:
- 在Decoder调用pull_cache接口拉取对应请求的KV Cache到申请的内存中
1 2 3 4 5 6 7 8 9 10
# 等待P侧KV Cache计算完成 # 创建和P侧申请cache是相同的cache_key,用于拉取对应的KV Cache cache_key = cache_key(prompt_cluster_id=0, req_id=1, model_id=0) if需要中转cache: cache_desc_tmp = CacheDesc(num_tensors=4, shape=[1, 4, 8], data_type=DataType.DT_FLOAT16) kv_cache_tmp = kv_cache_manager.allocate_cache(cache_desc_tmp) kv_cache_manager.pull_cache(cache_key, kv_cache_tmp) kv_cache_manager.copy_cache(kv_cache, kv_cache_tmp, 1, 0) # 拉到batch index为1的位置上 else: kv_cache_manager.pull_cache(cache_key, kv_cache, 1, 64) # 拉到batch index为1的位置上
- 在Prompt调用transfer_cache_async接口将数据传输至Decoder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# 实现LayerSynchronizerImpl,通过torch Event获取各层计算结束状态,本例中通过Event机制实现 class LayerSynchronizerImpl(LayerSynchronizer): def __init__(self, events): self._events = events def synchronize_layer(self, layer_index: int, timeout_in_millis: Optional[int]) -> bool: self._events[layer_index].wait() return True events = [torch.npu.Event() for _ in range(cache_desc.num_tensors // 2)] # 执行模型, 模型在各层计算完成后调用events[layer_index].record()记录完成状态 # 该函数由用户实现 user_model.Predict(kv_cache_tensors, events) # 模型下发完成后,调用transfer_cache_async传输数据,此处需要填写Decoder申请出的KV Cache各层tensor的内存地址 transfer_config = TransferConfig(DECODER_CLUSTER_ID, decoder_kv_cache_addrs) cache_task = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(events), [transfer_config]) # 同步等待传输结果 cache_task.synchronize()
- 在Decoder调用pull_cache接口拉取对应请求的KV Cache到申请的内存中
- 将KV Cache转化为框架中所需要的KV Cache类型,不同框架需要提供根据地址创建对应类型的KV Cache的接口
1 2 3 4 5 6 7 8 9 10 11 12
# 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以PyTorch中转换的接口为例 # 转换操作和pull操作顺序不分先后 kv_tensor_addrs = kv_cache.per_device_tensor_addrs[0] kv_tensors = torchair.llm_datadist.create_npu_tensors(kv_cache.cache_desc.shape, torch.float16, kv_tensor_addrs) # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv mid =len(kv_tensors) //2 k_tensors = kv_tensors[: mid] v_tensors = kv_tensors[mid:] kv_cache_tensors =list(zip(k_tensors, v_tensors)) # 将转换的kv_tensors传给模型进行迭代推理 # 等待请求增量推理完成
- 根据业务中cache的使用时机自行释放对应请求的KV Cache内存。
1 2 3 4
# P侧当batch中所有请求的KV Cache都被拉走后,调用deallocate_cache才会真正释放内存。如果D侧未拉走KV Cache,则还需要调用remove_cache_key。 kv_cache_manager.remove_cache_key(cache_key_0) kv_cache_manager.remove_cache_key(cache_key_2) kv_cache_manager.deallocate_cache(kv_cache)
1 2
# D侧由于申请时不需要cache_key,所以释放时只需要调用deallocate接口。 kv_cache_manager.deallocate_cache(kv_cache)
- 业务退出时,P侧和D侧根据集群建链的示例进行断链和llm_datadist的finalize。
PA场景
在业务中,KV Cache涉及的流程是:
- P侧和D侧根据集群建链的示例完成llm_datadist的初始化和建链操作
- 在P侧和D侧模型的每层按照(num_block, block_mem_size)的固定大小调用allocate_blocks_cache申请好KV Cache的内存。PA场景下,不同请求对创建的num_block大小的KV Cache进行复用,上层框架自行管理,业务结束后释放申请的内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
# P侧脚本 # 从初始化完的llm_datadist中取到kv_cache_manager kv_cache_manager = llm_datadist.kv_cache_manager # 根据模型中KV Cache的shape以及总的个数创建CacheDesc num_blocks =10 block_mem_size=128 cache_desc = CacheDesc(num_tensors=4, shape=[10, 128], data_type=DataType.DT_FLOAT16) # 根据初始化llm_datadist时的cluster_id创建对应请求的BlocksCacheKey cache_key = BlocksCacheKey(prompt_cluster_id=0, model_id=0) # 调用allocate_blocks_cache接口申请好KV Cache内存 kv_cache = kv_cache_manager.allocate_blocks_cache(cache_desc, cache_key) # 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以PyTorch中转换的接口为例 kv_tensor_addrs = kv_cache.per_device_tensor_addrs[0] kv_tensors = torchair.llm_datadist.create_npu_tensors(kv_cache.cache_desc.shape, torch.float16, kv_tensor_addrs) # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv mid =len(kv_tensors) //2 k_tensors = kv_tensors[: mid] v_tensors = kv_tensors[mid:] kv_cache_tensors =list(zip(k_tensors, v_tensors))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# D侧脚本 # 从初始化完的llm_datadist中取到kv_cache_manager kv_cache_manager = llm_datadist.kv_cache_manager # 根据模型中KV Cache的shape以及总的个数创建CacheDesc num_blocks =10 block_mem_size=128 cache_desc = CacheDesc(num_tensors=4, shape=[10, 128], data_type=DataType.DT_FLOAT16) # 调用allocate_blocks_cache接口申请好KV Cache内存 kv_cache = kv_cache_manager.allocate_blocks_cache(cache_desc) # 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以torch中转换的接口为例 # 转换操作和pull操作顺序不分先后 kv_tensor_addrs = kv_cache.per_device_tensor_addrs[0] kv_tensors = torchair.llm_datadist.create_npu_tensors(kv_cache.cache_desc.shape, torch.float16, kv_tensor_addrs) # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv mid =len(kv_tensors) //2 k_tensors = kv_tensors[: mid] v_tensors = kv_tensors[mid:] kv_cache_tensors =list(zip(k_tensors, v_tensors))
- P侧有新请求进来后,会给每个请求分配好对应的block_index,这块是推理框架提供的功能,模型推理完之后,该请求对应的KV Cache就在对应的block_index所在的内存上,将模型输出和请求对应的block_table传输给D侧推理模型作为输入。
- D侧有新请求进来后,也会给每个请求分配好对应的block_index,然后调用pull_blocks接口,根据P侧的block_index和D侧的block_index的对应关系,将KV Cache传输到指定位置。此时有两种方式:
- 在Decoder调用pull_cache接口拉取对应请求的KV Cache到申请的内存中
1 2 3 4
# D侧根据P侧传过来的信息,添加新请求,并申请对应的block_table # D侧根据传过来请求的src_block_table和新申请出来的dst_block_table拉取kv到对应block cache_key = BlocksCacheKey(prompt_cluster_id=0, model_id=0) # P侧allocate_blocks_cache时的入参 kv_cache_manager.pull_blocks(cache_key, cache, [0, 1], [2, 3]) # 将P侧0, 1 block位置上的数据拉到D侧2, 3 block位置上
- 在Prompt调用transfer_cache_async接口将数据传输至Decoder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# 实现LayerSynchronizerImpl,通过torch Event获取各层计算结束状态,本例中通过Event机制实现 class LayerSynchronizerImpl(LayerSynchronizer): def __init__(self, events): self._events = events def synchronize_layer(self, layer_index: int, timeout_in_millis: Optional[int]) -> bool: self._events[layer_index].wait() return True events = [torch.npu.Event() for _ in range(cache_desc.num_tensors // 2)] # 执行模型, 模型在各层计算完成后调用events[layer_index].record()记录完成状态 # 该函数由用户实现 user_model.Predict(kv_cache_tensors, events) # 模型下发完成后,调用transfer_cache_async传输数据,此处需要填写Decoder申请出的KV Cache各层tensor的内存地址 transfer_config = TransferConfig(DECODER_CLUSTER_ID, decoder_kv_cache_addrs) cache_task = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(events), [transfer_config], [0, 1], [2, 3]) # 同步等待传输结果 cache_task.synchronize()
- 在Decoder调用pull_cache接口拉取对应请求的KV Cache到申请的内存中
- 如果业务中使用了swap_blocks,copy_blocks功能,则在使用位置用llm_datadist提供的功能替换。
- 业务结束后P侧和D侧调用deallocate_cache释放已申请的KV Cache内存。
1 2 3
# 等待D侧拉取完对应请求的KV Cache... # 根据业务中cache的使用时机自行释放对应请求的KV Cache。PA场景无需释放对应的cache_key kv_cache_manager.deallocate_cache(kv_cache)
- 业务退出时,P侧和D侧根据集群建链的示例进行断链和llm_datadist的finalize。
异常处理
- 遇到LLM_DEVICE_OUT_OF_MEMORY,表示device申请KV Cache内存失败。需要检查初始化时设置的ge.flowGraphMemMaxSize大小以及申请KV Cache的大小,查看是否有请求KV Cache拉取之后没有释放内存。
- 遇到LLM_KV_CACHE_NOT_EXIST,表示对端KV Cache不存在,需要检查对端进程是否异常或者对应KV Cache的请求有没有推理完成。该错误不影响其他请求流程,确认流程后可以重试。
- 遇到LLM_WAIT_PROCESS_TIMEOUT,表示pull kv超时,说明链路出现问题,需要重新断链建链再尝试。