链路管理

功能介绍

NN模型执行时调用的HCCL集合通信接口是双边通信,即需要两边同时发起建链,而在P-D分离方案中,简化建链操作,由Client单侧发起建链。由于动态扩缩的部分大多数是Decode侧,因此将P定义为Server端,D定义为Client端,建链过程实现由D向P发起建链的流程。

主要提供的是link_clusters和unlink_clusters两个接口,都是由D侧进行调用,建链行为是点对点之间的。

使用场景

功能示例

此处代码示例为1P1D之间建链的伪代码流程,接口参数可参考LLM-DataDist接口列表

  1. 拉起P和D侧脚本,脚本中调用LLM-DataDist的初始化接口。P侧需要设置侦听的device ip和port。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # P侧脚本
    from llm_datadist import LLMDataDist, LLMRole, LLMStatusCode, LLMClusterInfo
    
     # llm datadist初始化
     llm_datadist = LLMDataDist(LLMRole.Prompt, cluster_id=0)
     llm_config = LLMConfig()
     llm_config.listen_ip_info ="192.168.1.1:26000"# local_ip + port
     llm_config.device_id =0  # 此处的device id需要和local_ip匹配
     llm_options = llm_config.generate_options()
     llm_datadist.init(llm_options)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # D侧脚本
    from llm_datadist import LLMDataDist, LLMRole, LLMStatusCode, LLMClusterInfo
    
     # llm datadist初始化
     llm_datadist = LLMDataDist(LLMRole.DECODER, cluster_id=0)
     llm_config = LLMConfig()
     llm_config.device_id =0 
     llm_options = llm_config.generate_options()
     llm_datadist.init(llm_options)
    
  2. 在D侧脚本中调用link_clusters发起建链操作,当业务退出时在D侧调用unlink_clusters进行断链。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
     # 生成cluster info信息用于建链
     cluster = LLMClusterInfo()
     cluster.remote_cluster_id = 1  # 此处的remote_cluster_id需要和P侧创建的LLMDataDist对应
     cluster.append_local_ip_info("192.168.2.1", 26000) # local_ip_info的ip是本机需要建链的device ip地址
     cluster.append_remote_ip_info("192.168.1.1", 26000) # remote_ip_info的ip是想和对端建链的device ip地址
    
     # 调用link_clusters进行建链
     # ret是接口的返回值,rets表示每个cluster建链的结果。
     ret, rets = llm_datadist.link_clusters([cluster], timeout=5000)
     # 判断建链结果
     if ret != LLMStatusCode.LLM_SUCCESS:
         raiseException("link failed.")
     for cluster_i in range(len(rets)):
         link_ret = rets[cluster_i]
         if link_ret != LLMStatusCode.LLM_SUCCESS:
             print(f"{cluster_i} link failed.")
    
  3. 在D侧可以调用check_link_status快速检查链路传输数据是否正常。
    1
    2
    3
    4
    5
    try:
        llm_datadist.check_link_status(remote_cluster_id=1)
    except LLMException as ex:
        print(f"check_link_status exception:{ex.status_code}")
        raise ex
    
  4. 业务结束D侧进行断链,P和D都调用llm_datadist的finalize释放资源。
    1
    2
    3
    4
    5
    6
    7
     # P侧脚本
     # 调用llm_datadist申请KV Cache
     # 执行业务推理
     # ...
    
     # 业务退出
     llm_datadist.finalize()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
     # D侧脚本
     # pull_cache、模型推理
     # ...
    
     # 业务退出,调用unlink_clusters进行断链
     ret, rets = llm_datadist.unlink_clusters([cluster], timeout=5000)
     if ret != LLMStatusCode.LLM_SUCCESS:
         raiseRuntimeError(f'[unlink_cluster] failed, ret={ret}')
     llm_datadist.finalize()
    

当新增节点或者已下线节点再上线时,需要执行一遍上述使用流程。当下线节点时,正常D侧需要主动调用unlink_clusters接口异常场景需要P侧调用unlink_clusters。

通过节点上线调用link_clusters,节点下线调用unlink_clusters来灵活的进行分布式集群的动态扩缩容。

异常处理

当D侧出现异常导致无法调用unlink_clusters时,需要由P侧调用unlink_clusters进行资源清理,否则无法再次进行建链。

unlink_clusters接口提供了强制断链的能力,该能力适用于链路故障时,普通断链操作会耗时比较久。使用强制断链接口(设置force=True),需要两侧都发起调用,只会清理本端的链接。

1
2
3
4
# 强制断链
 ret, rets = llm_datadist.unlink_clusters([cluster], timeout=5000, force=True)
 if ret != LLMStatusCode.LLM_SUCCESS:
     raise RuntimeError(f'[unlink_cluster] failed, ret={ret}')

更多异常处理请参考LLMStatusCode