cache管理
功能介绍
本章节按照PA场景和非PA场景介绍对应的功能,每个场景涉及D2D,D2H,H2D的传输方式。每种传输方式同时会有使用用户注册内存和llm_datadist管理内存的方式。
相比仅支持D2D传输的接口,主要额外新增以下接口功能。
- CacheDesc新增placement入参。表示cache所在的设备。
- cache_manager.register_cache:将用户的连续cache内存地址注册给llm_datadist。
- cache_manager.register_blocks_cache:将用户的block的cache内存地址注册给llm_datadist。
使用场景
主要用于分布式集群间的KV Cache传输和搬移。
功能示例
本示例主要分PA场景和非PA场景介绍下接口的使用,主要涉及KV Cache的申请、释放、传输、搬移,会根据业务角色给出伪代码示例,具体接口功能和接口参数可参考《LLM DataDist接口参考(Python)》。
PA场景主要以用户注册内存的流程给出示例,非PA场景主要以llm_datadist管理内存的流程给出示例。
示例以P侧作为数据发送端,D侧作为数据接收端。实际业务场景可自行调整。
PA场景
推荐使用用户注册内存方式,因为PA场景管理了一块num_blocks大小的内存,这块内存的使用由框架管理,通过用户注册内存的方式更方便。也可以使用llm_datadist管理cache的接口,示例参考非PA场景功能示例,更换接口为
- cachemanager.allocate_cache更换为cachemanager.allocate_blocks_cache
- cachemanager.pull_cache更换为cachemanager.pull_blocks
如果业务中使用了swap_blocks,copy_blocks功能,则在使用位置用llm_datadist提供的功能替换。
以D2D传输示例,D2H和H2D对应的更改收发端的placement参数:
- P侧参考注册用户内存中的示例进行llm_datadist初始化,在注册用户内存时修改CacheDesc。
1 2 3 4 5 6 7
# 将torch的kv tensor内存注册给llm_datadist cache_manager = llm_datadist.cache_manager cache_desc = CacheDesc(num_tensors=1, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT, placement=Placement.DEVICE) tensor = torch.ones(2, 1024*1024, dtype=torch.float).npu() addr =int(tensor.data_ptr()) # block_cache_key作为可选参数,在数据接收端注册的时候可以不用配置。根据实际业务自行选择register_blocks_cache或者register_cache cache = cache_manager.register_blocks_cache(cache_desc, [addr], BlocksCacheKey(1, 0))
- D侧参考注册用户内存中的示例进行llm_datadist初始化,在注册用户内存时修改CacheDesc。
1 2 3 4 5 6 7
# 将torch的kv tensor内存注册给llm_datadist cache_manager = llm_datadist.cache_manager cache_desc = CacheDesc(num_tensors=1, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT, placement=Placement.DEVICE) tensor = torch.ones(2, 1024*1024, dtype=torch.float).npu() addr =int(tensor.data_ptr()) # block_cache_key作为可选参数,在数据接收端注册的时候可以不用配置。根据实际业务自行选择register_blocks_cache或者register_cache cache = cache_manager.register_blocks_cache(cache_desc, [addr])
- P侧和D侧参考注册用户内存中查询内存状态示例,等待内存注册完成。
- P侧执行业务推理,等待D侧拉取kv完成。D侧等待P侧KV Cache计算完成,调用pull接口拉KV Cache,执行推理业务。
1 2 3 4 5
# D侧脚本 # 等待P侧KV Cache计算完成,调用pull_blocks接口拉取cache cache_manager = llm_datadist.cache_manager cache_manager.pull_blocks(BlocksCacheKey(1, 0), cache, src_blocks=[0, 1], dst_blocks=[0, 1]) # ...执行推理业务
- 参考集群建链示例进行断链以及llm_datadist资源释放。
非PA场景
推荐使用llm_datadist管理内存的方式。因为非PA场景每个请求都需要申请一块cache内存,通过llm_datadist预申请内存池管理的方式可以节省申请内存的耗时。也可以使用注册用户内存方式,示例参考PA场景示例。更换接口为
- cachemanager.register_blocks_cache更换为cachemanager.register_cache
- cachemanager.pull_blocks更换为cachemanager.pull_cache
D2D传输示例,D2H和H2D对应的更改收发端的placement参数:
- 参考建链流程的非PA场景示例,初始化llm_datadist,link,等待内存注册完成。
- P侧和D侧根据框架中的KV Cache大小,从llm_datadist中申请对应的KV Cache内存,并转换为框架KV Cache类型。P侧使用转换后的KV Cache执行业务,等待KV Cache被pull。D侧等待P侧KV Cache计算完成调用pull拉取kv,进行推理业务。
1 2 3 4 5 6 7 8 9 10 11 12 13
# P侧脚本 cache_manager = llm_datadist.cache_manager cache_desc = CacheDesc(num_tensors=4, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT16, placement=Placement.DEVICE) cache_key_0 = CacheKey(prompt_cluster_id=1, req_id=0, model_id=0) cache_key_1 = CacheKey(prompt_cluster_id=1, req_id=1, model_id=0) cache = cache_manager.allocate_cache(cache_desc, [cache_key_0, cache_key_1]) # 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以torch中转换的接口为例 tensor_addrs = cache.tensor_addrs tensors = torchair.llm_datadist.create_npu_tensors(cache.cache_desc.shape, torch.float16, tensor_addrs) # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv # ...执行业务推理 # 等待D侧完成pull
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# D侧脚本 cache_manager = llm_datadist.cache_manager cache_desc = CacheDesc(num_tensors=1, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT16, placement=Placement.DEVICE) cache = cache_manager.allocate_cache(cache_desc) # 等待P侧cache计算完成 # 调用pull_cache拉取cache,拉取的cache_key和P侧对应 cache_key_0 = CacheKey(prompt_cluster_id=1, req_id=0, model_id=0) cache_manager.pull_cache(cache_key_0, cache, batch_index=0) # 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以torch中转换的接口为例 tensor_addrs = cache.tensor_addrs tensors = torchair.llm_datadist.create_npu_tensors(cache.cache_desc.shape, torch.float16, tensor_addrs) # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv # ...执行推理业务
- 根据业务中cache的使用时机自行释放请求申请的cache和cache_key。
1 2 3 4 5
# P侧脚本 # 当batch中所有请求的KV Cache都被拉走后,调用deallocate_cache才会真正释放内存。如果D侧未拉走KV Cache,则还需要调用remove_cache_key。 cache_manager.remove_cache_key(cache_key_0) cache_manager.remove_cache_key(cache_key_1) cache_manager.deallocate_cache(cache)
1 2 3
# D侧脚本 # 根据业务中cache的使用时机自行释放cache。申请时未设置cacke_key,所以不需要释放 cache_manager.deallocate_cache(cache)
- 参考建链流程进行断链以及llm_datadist资源释放。
非PA场景还支持连续往非连续数据发送,常用于将连续的KV Cache内存发送到非连续host的内存上方便管理。当然也支持D2D场景
连续往非连续发送需要保证连续内存除以block_mem_size向上取整后要等于dst_block的个数。
- P侧参考建链流程的非PA场景示例,D侧链流程的PA场景示例,初始化llm_datadist,link,等待内存注册完成。
- P侧根据框架中的KV Cache大小,从llm_datadist中申请对应的KV Cache内存,并转换为框架KV Cache类型。D侧参考注册用户内存中的示例在注册用户内存时修改CacheDesc,等待内存注册完成。
1 2 3 4 5 6 7 8 9
# P侧脚本 cache_manager = llm_datadist.cache_manager cache_desc = CacheDesc(num_tensors=4, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT16, placement=Placement.DEVICE) cache_key = CacheKey(prompt_cluster_id=1, req_id=0, model_id=0) cache = cache_manager.allocate_cache(cache_desc, [cache_key]) # 将申请好的KV Cache转换为框架中的KV Cache类型,此处需要在不同框架中提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以torch中转换的接口为例 tensor_addrs = cache.tensor_addrs tensors = torchair.llm_datadist.create_npu_tensors(cache.cache_desc.shape, torch.float16, tensor_addrs) # 将转换后的tensor拆分为框架需要的kv配对方式,可以自定义组合kv
1 2 3 4 5 6 7 8 9
# D侧脚本 # 将torch的kv tensor内存注册给llm_datadist cache_manager = llm_datadist.cache_manager cache_desc = CacheDesc(num_tensors=1, shape=[2, 1024*1024], data_type=DataType.DT_FLOAT, placement=Placement.HOST) tensor = torch.ones(2, 1024*1024, dtype=torch.float).npu() addr =int(tensor.data_ptr()) # block_cache_key作为可选参数,在数据接收端注册的时候可以不用配置。根据实际业务自行选择register_blocks_cache或者register_cache cache = cache_manager.register_blocks_cache(cache_desc, [addr]) # 按照注册用户内存章节示例执行到查询内存状态,等待查询完成
- P侧执行业务推理,等待KV Cache被D侧拉走。D侧等待KV Cache计算完成,调用pull拉取kv,执行推理业务。
1 2 3 4 5 6
# D侧脚本 # 调用pull_blocks接口拉取cache cache_manager = llm_datadist.cache_manager cache_key_0 = CacheKey(prompt_cluster_id=1, req_id=0, model_id=0) cache_manager.pull_blocks(cache_key_0, cache, src_blocks=[], dst_blocks=[0]) # ...执行推理业务
- P侧根据业务中cache的使用时机自行释放请求申请的cache和cache_key。D侧由于是注册的用户内存,不需要释放。
1 2 3 4 5
# P侧脚本 # 当batch中所有请求的KV Cache都被拉走后,调用deallocate_cache才会真正释放内存。如果D侧未拉走KV Cache,则还需要调用remove_cache_key。 cache_manager.remove_cache_key(cache_key_0) cache_manager.remove_cache_key(cache_key_1) cache_manager.deallocate_cache(cache)
- 参考建链流程示例进行断链以及llm_datadist资源释放。