選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

在 中使用串流操作 AWS Glue 互動式工作階段

焦點模式
在 中使用串流操作 AWS Glue 互動式工作階段 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

切換串流工作階段類型

使用 AWS Glue 互動式工作階段組態魔術 %streaming,用於定義您正在執行的任務,並初始化串流互動式工作階段。

為互動式開發抽樣輸入串流

我們衍生的一個工具,有助於增強 中的互動體驗 AWS Glue 互動式工作階段是在 中新增新方法GlueContext,以取得 static DynamicFrame 中串流的快照。 GlueContext可讓您檢查、互動和實作工作流程。

使用 GlueContext 類執行個體,您將能夠找到方法 getSampleStreamingDynamicFrame。此方法所需的引數為:

  • dataFrame:Spark Streaming DataFrame

  • options:請參閱下列可用的選項

可用選項包括:

  • windowSize:這也稱為微批次持續時間。此參數將判定觸發前一個批次後串流查詢將等待的時長。此參數值必須小於 pollingTimeInMs

  • pollingTimeIn Ms:方法執行的總時間長度。它將至少觸發一個微批次,以從輸入串流中獲取樣本記錄。

  • recordPollingLimit:此參數可協助您限制要從串流輪詢的記錄總數。

  • (選用) 您也可以使用 writeStreamFunction 將此自訂函數應用至每個記錄抽樣函數。請參閱下列 Scala 和 Python 中的範例。

Scala
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
Python
def sample_batch_function(batch_df, batch_id): //Optional but you can replace your own forEachBatch function here options = { "pollingTimeInMs": "10000", "windowSize": "5 seconds", } glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
注意

當抽樣 DynFrame 為空白時,它可能是由以下幾個原因造成的:

  • 串流來源設定為「最新」,而且在抽樣週期期間沒有擷取任何新資料。

  • 輪詢時間不足,無法處理它擷取的記錄。除非整個批次處理完畢,否則資料不會顯示。

在互動式工作階段中執行串流應用程式

In (入) AWS Glue 互動式工作階段,您可以執行 AWS Glue 串流應用程式,就像在 中建立串流應用程式的方式一樣 AWS Glue 主控台。由於互動式工作階段是以工作階段為基礎,因此在執行時間遇到異常情形不會導致工作階段停止。反覆開發批次函數現在帶來了額外好處。例如:

def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})

在上述範例中,我們包含了方法的無效使用,與一般 不同 AWS Glue 將結束整個應用程式的任務、使用者的編碼內容和定義會完全保留,而且工作階段仍然可以運作。無需引導新叢集並重新執行之前的所有轉換。這樣,您就可以專注於快速逐一查看批次函數實作,以獲得理想的結果。

需要注意的是,互動式工作階段會以封鎖的方式評估每個陳述句,以便讓工作階段一次只執行一個陳述句。由於串流查詢是連續的並且永不結束,因此具有作用中串流查詢的工作階段將無法處理任何後續陳述句,除非它們被中斷。您可以直接從 Jupyter 筆記本發出中斷命令,我們的核心會為您處理取消操作。

以下列正在等待執行的陳述句序列為範例:

Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely
隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。