链路管理
功能介绍
NN模型执行时调用的HCCL集合通信接口是双边通信,即需要两边同时发起建链,而在P-D分离方案中,简化建链操作,由Client单侧发起建链。由于动态扩缩的部分大多数是Decode侧,因此将P定义为Server端,D定义为Client端,建链过程实现由D向P发起建链的流程。
主要提供的是link_clusters和unlink_clusters两个接口,都是由D侧进行调用,建链行为是点对点之间的。
- link_clusters用于节点之间的建链
- unlink_clusters用于节点之间的断链
使用场景
- 建链操作是PD之间进行KV Cache传输的前提,所以想使能KV Cache传输功能,需要先进行建链。
- 集群可靠性场景下,当P或者D集群节点出现异常时,在不影响整个集群可用性前提下,通过断链下线对应故障节点。
- 通过建链和断链动态调整PD集群配比。根据闲忙,动态的增加或减少对应的机器节点。增加节点需要建链,减少节点需要断链。
功能示例
此处代码示例为1P1D之间建链的伪代码流程,接口参数可参考LLM-DataDist接口列表。
- 拉起P和D侧脚本,脚本中调用LLM-DataDist的初始化接口。P侧需要设置侦听的device ip和port。
1 2 3 4 5 6 7 8 9 10
# P侧脚本 from llm_datadist import LLMDataDist, LLMRole, LLMStatusCode, LLMClusterInfo # llm datadist初始化 llm_datadist = LLMDataDist(LLMRole.Prompt, cluster_id=0) llm_config = LLMConfig() llm_config.listen_ip_info ="192.168.1.1:26000"# local_ip + port llm_config.device_id =0 # 此处的device id需要和local_ip匹配 llm_options = llm_config.generate_options() llm_datadist.init(llm_options)
1 2 3 4 5 6 7 8 9
# D侧脚本 from llm_datadist import LLMDataDist, LLMRole, LLMStatusCode, LLMClusterInfo # llm datadist初始化 llm_datadist = LLMDataDist(LLMRole.DECODER, cluster_id=0) llm_config = LLMConfig() llm_config.device_id =0 llm_options = llm_config.generate_options() llm_datadist.init(llm_options)
- 在D侧脚本中调用link_clusters发起建链操作,当业务退出时在D侧调用unlink_clusters进行断链。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# 生成cluster info信息用于建链 cluster = LLMClusterInfo() cluster.remote_cluster_id = 1 # 此处的remote_cluster_id需要和P侧创建的LLMDataDist对应 cluster.append_local_ip_info("192.168.2.1", 26000) # local_ip_info的ip是本机需要建链的device ip地址 cluster.append_remote_ip_info("192.168.1.1", 26000) # remote_ip_info的ip是想和对端建链的device ip地址 # 调用link_clusters进行建链 # ret是接口的返回值,rets表示每个cluster建链的结果。 ret, rets = llm_datadist.link_clusters([cluster], timeout=5000) # 判断建链结果 if ret != LLMStatusCode.LLM_SUCCESS: raiseException("link failed.") for cluster_i in range(len(rets)): link_ret = rets[cluster_i] if link_ret != LLMStatusCode.LLM_SUCCESS: print(f"{cluster_i} link failed.")
- 在D侧可以调用check_link_status快速检查链路传输数据是否正常。
1 2 3 4 5
try: llm_datadist.check_link_status(remote_cluster_id=1) except LLMException as ex: print(f"check_link_status exception:{ex.status_code}") raise ex
- 业务结束D侧进行断链,P和D都调用llm_datadist的finalize释放资源。
1 2 3 4 5 6 7
# P侧脚本 # 调用llm_datadist申请KV Cache # 执行业务推理 # ... # 业务退出 llm_datadist.finalize()
1 2 3 4 5 6 7 8 9
# D侧脚本 # pull_cache、模型推理 # ... # 业务退出,调用unlink_clusters进行断链 ret, rets = llm_datadist.unlink_clusters([cluster], timeout=5000) if ret != LLMStatusCode.LLM_SUCCESS: raiseRuntimeError(f'[unlink_cluster] failed, ret={ret}') llm_datadist.finalize()
当新增节点或者已下线节点再上线时,需要执行一遍上述使用流程。当下线节点时,正常D侧需要主动调用unlink_clusters接口,异常场景需要P侧调用unlink_clusters。
通过节点上线调用link_clusters,节点下线调用unlink_clusters来灵活的进行分布式集群的动态扩缩容。
异常处理
当D侧出现异常导致无法调用unlink_clusters时,需要由P侧调用unlink_clusters进行资源清理,否则无法再次进行建链。
unlink_clusters接口提供了强制断链的能力,该能力适用于链路故障时,普通断链操作会耗时比较久。使用强制断链接口(设置force=True),需要两侧都发起调用,只会清理本端的链接。
1 2 3 4 |
# 强制断链 ret, rets = llm_datadist.unlink_clusters([cluster], timeout=5000, force=True) if ret != LLMStatusCode.LLM_SUCCESS: raise RuntimeError(f'[unlink_clusters] failed, ret={ret}') |
父主题: 功能介绍