流式输入和输出
功能介绍
在DataFlow中,流式输入是指:UDF执行一次可以获取多次输入。比如流式输出是指:UDF执行一次可以输出多次。详细介绍和示例请参见使用方法。
在用户使用dataflow.pyflow或者dataflow.method构造UDF时,若需要将多次输入数据组batch时,建议使用流式输入;若需要将输出数据拆分成多份分发时,建议使用流式输出。
流式输入通过参数stream_input='Queue'表示,此时入参类型为FlowMsgQueue,用户可以自行从队列中取数据;流式输出通过Python中yield生成函数表达,函数输出次数与yield的次数一致。
使用约束
流式输入场景下,DataFlow框架不支持数据对齐和异常事务处理。
使用方法
流式输入使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import dataflow as df # 使用stream_input参数表达该函数为流式输入,入参类型为Queue,func执行一次会从输入队列a取出两次数据,从输入队列b取出一次数据 @df.pyflow(stream_input='Queue') def func(a, b): data1 = a.get() data2 = a.get() data3 = b.get() return data1 + data2 + data3 @df.pyflow class Foo(): # 使用stream_input参数表达该函数为流式输入,入参类型为Queue,func执行一次会从输入队列a取出两次数据,从输入队列b取出一次数据 @df.method(stream_input='Queue') def func(self, a, b): data1 = a.get() data2 = a.get() data3 = b.get() return data1 + data2 + data3 |
流式输出使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import dataflow as df # 使用yield表达函数为流式输出,该示例中func被调度执行1次,会输出5次结果 @df.pyflow def func(a): for i in range(5): yield a + i @df.pyflow class Foo(): # 使用yield表达函数为流式输出,该示例中func被调度执行1次,会输出5次结果 @df.method() def func(self, a): for i in range(5): yield a + i |
父主题: 专题