transfer_cache_async
函数功能
异步分层传输KV Cache。
函数原型
transfer_cache_async(src_cache: KvCache, layer_synchronizer: LayerSynchronizer, transfer_configs: Union[List[TransferConfig], Tuple[TransferConfig]], src_block_indices: Optional[Union[List[int], Tuple[int]]] = None, dst_block_indices: Optional[Union[List[int], Tuple[int]]] = None, dst_block_memory_size: Optional[int] = None) -> CacheTask
参数说明
参数名称 |
数据类型 |
取值说明 |
---|---|---|
src_cache |
源Cache。 |
|
layer_synchronizer |
LayerSynchronizer的实现类对象 |
|
transfer_configs |
Union[List[TransferConfig], Tuple[TransferConfig]] |
传输配置列表或元组 |
src_block_indices |
Optional[Union[List[int], Tuple[int]]] |
源Cache的block indices,当源Cache为PA场景时设置 |
dst_block_indices |
Optional[Union[List[int], Tuple[int]]] |
目的Cache的block indices,当目的Cache为PA场景时设置 |
dst_block_memory_size |
Optional[int] |
目的Cache每个block占用的内存大小,当目的Cache为PA场景时设置。如果源Cache也为PA场景,则可省略该参数,此时会自动将其设置为源Cache每个block占用的内存大小。 该参数设置为0时等同于省略该参数。 |
调用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | from llm_datadist import *
...
class LayerSynchronizerImpl(LayerSynchronizer):
def synchronize_layer(self, layer_index: int, timeout_in_millis: Optional[int]) -> bool:
# need control time for transfer layer here.
return True
num_layers = 40
dst_cluster_id = 2
# need register decoder kv addr here.
decoder_addrs = ...
assert(len(decoder_addrs) = 2*num_layers)
transfer_config = TransferConfig(dst_cluster_id, decoder_addrs, range(0, num_layers), 0)
cache_task = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(), [transfer_config])
cache_task.synchronize()
cache_task.get_results()
|
约束说明
- 当前仅支持src_cache与dst_cache都为连续cache的场景以及src_cache与dst_cache都为PA的场景。
- 使用同一条链路时,此接口和pull_cache、pull_blocks接口不支持并发。
- 本接口不支持并发调用。
- 单进程多卡模式下,不支持调用该接口。
父主题: KvCacheManager