在 AWS Glue 交互式会话中使用流式传输操作
切换串流会话类型
使用 AWS Glue 交互式会话配置魔法命令 %streaming
以定义您正在运行的任务并初始化串流交互式会话。
用于交互式开发的采样输入流式传输
皆在帮助提升 AWS Glue 交互式会话中交互式体验的一种派生工具是在 GlueContext
下添加一种新方法,以获取静态 DynamicFrame 中流式传输的快照。GlueContext
允许您检查、交互和实施工作流。
使用 GlueContext
类实例,您将能够找到方法 getSampleStreamingDynamicFrame
。此方法要求的参数为:
-
dataFrame
:Spark Streaming DataFrame -
options
:查看以下可用选项
可用选项包括:
-
windowSize:这也称为微批处理持续时间。此参数将确定在触发前一批处理后串流查询的等待时间。此参数值必须小于
pollingTimeInMs
。 -
pollingTimeInMs:方法将运行的总时间长度。它将触发至少一个微批处理,以从输入流式传输中获取样本记录。
-
recordPollingLimit:此参数帮助您限制从流式传输中轮询的记录的总数。
-
(可选)您也可以使用
writeStreamFunction
将此自定义函数应用于每个记录采样函数。有关 Scala 和 Python 中的示例,请参阅以下内容。
注意
在采样的 DynFrame
为空时,可能是由以下几个原因造成的:
-
流式传输源设置为“Latest(最新)”,并且在采样周期内没有摄入新数据。
-
轮询时间不足以处理其摄入的记录。除非整个批处理处理完毕,否则数据不会显示。
在交互式会话中运行串流应用程序
在 AWS Glue 交互式会话中,您可以运行 AWS Glue 串流应用程序,就像您在 AWS Glue 控制台中创建串流应用程序一样。由于交互式会话基于会话,因此在运行时遇到异常不会导致会话停止。目前,我们具有以迭代方式开发批处理函数的额外优势。例如:
def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
我们在以上例子中包括了方法的无效用法,与退出整个应用程序的常规 AWS Glue 任务不同,用户的编码上下文和定义都完全保留,并且会话仍然在运行。无需引导启动新集群和重新运行所有之前的转换。这使您可以专注于快速迭代批处理函数实施以获得理想的结果。
需要注意的是,交互式会话以阻塞方式评估每个语句,以便会话一次仅能执行一条语句。由于流式传输查询将始终连续且永不结束,具有有效流式传输查询的会话将无法处理任何后续语句,除非这些会话中断。您可以直接从 Jupyter Notebook 发出中断命令,我们的内核将为您处理取消。
以下列正在等待执行的语句序列为例:
Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely