昇腾社区首页
中文
注册

多实例部署

功能介绍

当某个节点需要较多的计算资源时,可以考虑采用部署多份的方式来加速计算。

使用约束

多实例部署之后,由于实例间计算时序不一致,可能需要开启数据对齐特性保证数据配对。

使用方法

开启多实例请参考指定DataFlow节点部署位置

具体到使用场景,可以分以下两种方式:

表1 并行方式

并行方式

使用场景

优势

劣势

请求间多实例并行

存在多请求并发,需要提高吞吐率

实现简单,代码不需要特殊修改

无法降低单请求时延。

拆分请求多实例加速

需要降低单请求时延

单请求时延低

代码需要适配修改,多实例节点之前的节点需要拆分数据并指定分发策略,多实例节点需要指定数据汇聚策略,多实例后需要对数据进行合并。

  • 并行方式1:请求间多实例并行:
    图1 请求间多实例并行

    图中是由Model X->Model A->Model B串起来的DataFlow模型,由于Model A计算耗时长,所以对Model A拆成两个实例加速,数据流(A、B、C、D)经过Model X后,根据transaction ID轮询并分发到Model A的两个实例(数据A和C分流到实例1,数据B和D分流到实例2),之后再由Model B实现汇聚。

  • 并行方式2:拆分请求多实例加速:
    这种场景需要增加数据拆分和数据合并的逻辑,通过输出时指定BalanceConfig进行均衡分发,控制输出分发和汇聚(此功能当前不支持py_flow注解执行方式)。
    图2 拆分请求多实例加速

原始模型为由Model X->Model A->Model B串起来的DataFlow模型,由于Model A计算耗时长,为了降低端到端时延,对Model A进行多实例加速。详细步骤如下。

  1. 请求数据A经过Model X进入Model数据拆分模块,被拆分为4份数据(A1、A2、A3、A4)。该操作需要用户根据自己的业务逻辑进行实施,拆分后的数据需要自行携带相关数据标识用于后续节点进行识别处理(例如数据增加DataId/DataName等信息)。
  2. A1和A3被发送到Model A的实例1,A2和A4被发送给Model A的实例2。
  3. 经过处理后的A1、A2、A3、A4被发送到Model数据合并模块,合并为数据A。
  4. 数据A发送到Model B。

数据分发示例如下

基于用户拆分好的数据A1、A2、A3、A4,如下给出如何进行数据分发。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 构图阶段
flow_node.set_balance_scatter()

# UDF输出方式一:批量输出,适用于用户一次性把数据拆分完成的场景。
cfg = ff.BalanceConfig(1, 4, ff.AffinityPolicy.NO_AFFINITY) # 各参数含义为:row_num=1,col_num=4(共4份数据),affinity_policy=ff.AffinityPolicy.NO_AFFINITY(不亲和)

cfg.set_data_pos([(0, 0), (0, 1), (0, 2), (0, 3)]) # 数据A1对应的pos为{0,0},A2对应的pos为{0,1},A3对应的pos为{0,2},A4对应的pos为{0,3}
context.set_multi_outputs(0, [msg0, msg1, msg2, msg3], cfg)
# UDF输出方式二:单个输出,适用于用户将数据一个个拆分出来的,先拆分出来的数据可以先处理。
cfg = ff.BalanceConfig(1, 4, ff.AffinityPolicy.NO_AFFINITY)
# 根据拆分出的msg0进行分发
cfg.set_data_pos([(0, 0)])
context.set_output(0, msg0, cfg)
# 根据拆分出的msg1进行分发
cfg.set_data_pos([(0, 1)])
context.set_output(0, msg1, cfg)
# 根据拆分出的msg2进行分发
cfg.set_data_pos([(0, 2)])
context.set_output(0, msg2, cfg)
# 根据拆分出的msg3进行分发
cfg.set_data_pos([(0, 3)])
context.set_output(0, msg3, cfg)

数据合并示例如下。

1
2
3
4
5
6
7
# 构图阶段
flow_node.set_balance_gather()
# UDF输出BalanceConfig设置
cfg = ff.BalanceConfig(1, 4, ff.AffinityPolicy.ROW_AFFINITY)# 各参数含义为:row_num=1,col_num=4(共4份数据),affinity_policy=ff.AffinityPolicy.ROW_AFFINITY(按行亲和)
# 每个实例将行相同的数据分发到相同的后续目标节点(如果数据合并节点是单节点,可以不用按照这种指定BalanceConfig的方式发送),数据合并模块对收到的数据进行合并
cfg.set_data_pos([(0, 0)])
context.set_output(0, msg0, cfg)