产品 |
是否支持 |
---|---|
√ |
|
x |
|
x |
|
x |
|
x |
异步分层传输KV Cache。
1 2 3 4 5 6 | transfer_cache_async(src_cache: KvCache, layer_synchronizer: LayerSynchronizer, transfer_configs: Union[List[TransferConfig], Tuple[TransferConfig]], src_block_indices: Optional[Union[List[int], Tuple[int]]] = None, dst_block_indices: Optional[Union[List[int], Tuple[int]]] = None, dst_block_memory_size: Optional[int] = None) -> CacheTask |
参数名称 |
数据类型 |
取值说明 |
---|---|---|
src_cache |
源Cache。 |
|
layer_synchronizer |
LayerSynchronizer的实现类对象 |
|
transfer_configs |
Union[List[TransferConfig], Tuple[TransferConfig]] |
传输配置列表或元组 |
src_block_indices |
Optional[Union[List[int], Tuple[int]]] |
源Cache的block indices,当源Cache为PA场景时设置 |
dst_block_indices |
Optional[Union[List[int], Tuple[int]]] |
目的Cache的block indices,当目的Cache为PA场景时设置 |
dst_block_memory_size |
Optional[int] |
目的Cache每个block占用的内存大小,当目的Cache为PA场景时设置。如果源Cache也为PA场景,则可省略该参数,此时会自动将其设置为源Cache每个block占用的内存大小。 该参数设置为0时等同于省略该参数。 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | from llm_datadist import * ... class LayerSynchronizerImpl(LayerSynchronizer): def synchronize_layer(self, layer_index: int, timeout_in_millis: Optional[int]) -> bool: # need control time for transfer layer here. return True num_layers = 40 dst_cluster_id = 2 # need register decoder kv addr here. decoder_addrs = ... assert(len(decoder_addrs) = 2*num_layers) transfer_config = TransferConfig(dst_cluster_id, decoder_addrs, range(0, num_layers), 0) cache_task = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(), [transfer_config]) cache_task.synchronize() cache_task.get_results() |