昇腾社区首页
中文
注册

流式输入和输出

功能介绍

在DataFlow中,流式输入是指:UDF执行一次可以获取多次输入。比如流式输出是指:UDF执行一次可以输出多次。详细介绍和示例请参见使用方法

在用户使用dataflow.pyflow或者dataflow.method构造UDF时,若需要将多次输入数据组batch时,建议使用流式输入;若需要将输出数据拆分成多份分发时,建议使用流式输出。

流式输入通过参数stream_input='Queue'表示,此时入参类型为FlowMsgQueue,用户可以自行从队列中取数据;流式输出通过Python中yield生成函数表达,函数输出次数与yield的次数一致。

使用约束

流式输入场景下,DataFlow框架不支持数据对齐和异常事务处理。

使用方法

流式输入使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import dataflow as df

# 使用stream_input参数表达该函数为流式输入,入参类型为Queue,func执行一次会从输入队列a取出两次数据,从输入队列b取出一次数据
@df.pyflow(stream_input='Queue')
def func(a, b):
    data1 = a.get()
    data2 = a.get()
    data3 = b.get()
    return data1 + data2 + data3

@df.pyflow
class Foo():
    # 使用stream_input参数表达该函数为流式输入,入参类型为Queue,func执行一次会从输入队列a取出两次数据,从输入队列b取出一次数据
    @df.method(stream_input='Queue')
    def func(self, a, b):
        data1 = a.get()
        data2 = a.get()
        data3 = b.get()
        return data1 + data2 + data3
流式输出使用示例:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import dataflow as df

# 使用yield表达函数为流式输出,该示例中func被调度执行1次,会输出5次结果
@df.pyflow
def func(a):
    for i in range(5):
        yield a + i


@df.pyflow
class Foo():
    # 使用yield表达函数为流式输出,该示例中func被调度执行1次,会输出5次结果
    @df.method()
    def func(self, a):
        for i in range(5):
            yield a + i