NN模型执行时调用的HCCL集合通信接口是双边通信,即需要两边同时发起建链,而在P-D分离方案中,简化建链操作,由Client单侧发起建链。由于动态扩缩的部分大多数是Decode侧,因此将P定义为Server端,D定义为Client端,建链过程实现由D向P发起建链的流程。
主要提供的是link_clusters和unlink_clusters两个接口,都是由D侧进行调用,建链行为是点对点之间的。
此处代码示例为1P1D之间建链的伪代码流程,接口参数可参考LLM-DataDist接口列表。
1 2 3 4 5 6 7 8 9 10 |
# P侧脚本 from llm_datadist import LLMDataDist, LLMRole, LLMStatusCode, LLMClusterInfo # llm datadist初始化 llm_datadist = LLMDataDist(LLMRole.Prompt, cluster_id=0) llm_config = LLMConfig() llm_config.listen_ip_info ="192.168.1.1:26000"# local_ip + port llm_config.device_id =0 # 此处的device id需要和local_ip匹配 llm_options = llm_config.generate_options() llm_datadist.init(llm_options) |
1 2 3 4 5 6 7 8 9 |
# D侧脚本 from llm_datadist import LLMDataDist, LLMRole, LLMStatusCode, LLMClusterInfo # llm datadist初始化 llm_datadist = LLMDataDist(LLMRole.DECODER, cluster_id=0) llm_config = LLMConfig() llm_config.device_id =0 llm_options = llm_config.generate_options() llm_datadist.init(llm_options) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# 生成cluster info信息用于建链 cluster = LLMClusterInfo() cluster.remote_cluster_id = 1 # 此处的remote_cluster_id需要和P侧创建的LLMDataDist对应 cluster.append_local_ip_info("192.168.2.1", 26000) # local_ip_info的ip是本机需要建链的device ip地址 cluster.append_remote_ip_info("192.168.1.1", 26000) # remote_ip_info的ip是想和对端建链的device ip地址 # 调用link_clusters进行建链 # ret是接口的返回值,rets表示每个cluster建链的结果。 ret, rets = llm_datadist.link_clusters([cluster], timeout=5000) # 判断建链结果 if ret != LLMStatusCode.LLM_SUCCESS: raiseException("link failed.") for cluster_i in range(len(rets)): link_ret = rets[cluster_i] if link_ret != LLMStatusCode.LLM_SUCCESS: print(f"{cluster_i} link failed.") |
1 2 3 4 5 |
try: llm_datadist.check_link_status(remote_cluster_id=1) except LLMException as ex: print(f"check_link_status exception:{ex.status_code}") raise ex |
1 2 3 4 5 6 7 |
# P侧脚本 # 调用llm_datadist申请KV Cache # 执行业务推理 # ... # 业务退出 llm_datadist.finalize() |
1 2 3 4 5 6 7 8 9 |
# D侧脚本 # pull_cache、模型推理 # ... # 业务退出,调用unlink_clusters进行断链 ret, rets = llm_datadist.unlink_clusters([cluster], timeout=5000) if ret != LLMStatusCode.LLM_SUCCESS: raiseRuntimeError(f'[unlink_cluster] failed, ret={ret}') llm_datadist.finalize() |
当新增节点或者已下线节点再上线时,需要执行一遍上述使用流程。当下线节点时,正常D侧需要主动调用unlink_clusters接口,异常场景需要P侧调用unlink_clusters。
通过节点上线调用link_clusters,节点下线调用unlink_clusters来灵活的进行分布式集群的动态扩缩容。
当D侧出现异常导致无法调用unlink_clusters时,需要由P侧调用unlink_clusters进行资源清理,否则无法再次进行建链。
unlink_clusters接口提供了强制断链的能力,该能力适用于链路故障时,普通断链操作会耗时比较久。使用强制断链接口(设置force=True),需要两侧都发起调用,只会清理本端的链接。
1 2 3 4 |
# 强制断链 ret, rets = llm_datadist.unlink_clusters([cluster], timeout=5000, force=True) if ret != LLMStatusCode.LLM_SUCCESS: raise RuntimeError(f'[unlink_cluster] failed, ret={ret}') |
更多异常处理请参考LLMStatusCode。