本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
切換串流工作階段類型
使用 AWS Glue 互動式工作階段組態魔術 %streaming
,用於定義您正在執行的任務,並初始化串流互動式工作階段。
為互動式開發抽樣輸入串流
我們衍生的一個工具,有助於增強 中的互動體驗 AWS Glue 互動式工作階段是在 中新增新方法GlueContext
,以取得 static DynamicFrame 中串流的快照。 GlueContext
可讓您檢查、互動和實作工作流程。
使用 GlueContext
類執行個體,您將能夠找到方法 getSampleStreamingDynamicFrame
。此方法所需的引數為:
-
dataFrame
:Spark Streaming DataFrame -
options
:請參閱下列可用的選項
可用選項包括:
-
windowSize:這也稱為微批次持續時間。此參數將判定觸發前一個批次後串流查詢將等待的時長。此參數值必須小於
pollingTimeInMs
。 -
pollingTimeIn Ms:方法執行的總時間長度。它將至少觸發一個微批次,以從輸入串流中獲取樣本記錄。
-
recordPollingLimit:此參數可協助您限制要從串流輪詢的記錄總數。
-
(選用) 您也可以使用
writeStreamFunction
將此自訂函數應用至每個記錄抽樣函數。請參閱下列 Scala 和 Python 中的範例。
-
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
注意
當抽樣 DynFrame
為空白時,它可能是由以下幾個原因造成的:
-
串流來源設定為「最新」,而且在抽樣週期期間沒有擷取任何新資料。
-
輪詢時間不足,無法處理它擷取的記錄。除非整個批次處理完畢,否則資料不會顯示。
在互動式工作階段中執行串流應用程式
In (入) AWS Glue 互動式工作階段,您可以執行 AWS Glue 串流應用程式,就像在 中建立串流應用程式的方式一樣 AWS Glue 主控台。由於互動式工作階段是以工作階段為基礎,因此在執行時間遇到異常情形不會導致工作階段停止。反覆開發批次函數現在帶來了額外好處。例如:
def batch_function(data_frame, batch_id):
log.info(data_frame.count())
invalid_method_call()
glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
在上述範例中,我們包含了方法的無效使用,與一般 不同 AWS Glue 將結束整個應用程式的任務、使用者的編碼內容和定義會完全保留,而且工作階段仍然可以運作。無需引導新叢集並重新執行之前的所有轉換。這樣,您就可以專注於快速逐一查看批次函數實作,以獲得理想的結果。
需要注意的是,互動式工作階段會以封鎖的方式評估每個陳述句,以便讓工作階段一次只執行一個陳述句。由於串流查詢是連續的並且永不結束,因此具有作用中串流查詢的工作階段將無法處理任何後續陳述句,除非它們被中斷。您可以直接從 Jupyter 筆記本發出中斷命令,我們的核心會為您處理取消操作。
以下列正在等待執行的陳述句序列為範例:
Statement 1:
val number = df.count()
#Spark Action with deterministic result
Result: 5
Statement 2:
streamingQuery.start().awaitTermination()
#Spark Streaming Query that will be executing continously
Result: Constantly updated with each microbatch
Statement 3:
val number2 = df.count()
#This will not be executed as previous statement will be running indefinitely