選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

AWS Glue 串流概念

焦點模式
AWS Glue 串流概念 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

以下各節提供有關「 AWS Glue 串流」概念的資訊。

AWS Glue 串流工作的剖析

AWS Glue 串流任務以 Spark 串流範例運作,並利用 Spark 架構中的結構化串流。串流任務會以特定時間間隔持續輪詢串流資料來源,以擷取記錄作為微批次。以下各節將檢查 AWS Glue 串流工作的不同部分。

該屏幕截圖顯示了 Amazon CloudWatch 監控日誌, AWS Glue 例如上面提供的示例,並查看所需執行者的數量(橙色線)並縮放執行程序(藍線)以匹配該日誌,而無需手動調整。

forEachBatch

此方forEachBatch法是 AWS Glue 串流工作執行的進入點。 AWS Glue 串流作業會使用此forEachBatch方法輪詢資料的功能,類似於在串流工作生命週期期間保持作用中的迭代器,並定期輪詢串流來源以取得新資料,並以微型批次處理最新資料。

glueContext.forEachBatch( frame=dataFrame_AmazonKinesis_node1696872487972, batch_function=processBatch, options={ "windowSize": "100 seconds", "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/", }, )

設定 forEachBatchframe 屬性以指定串流來源。在此範例中,您在建立工作期間在空白畫布中建立的來源節點會填入工作 DataFrame 的預設值。將 batch_function 屬性設定為您決定針對每個微批次作業呼叫的 function。您必須定義函數來處理傳入資料的批次轉換。

來源

processBatch函數的第一個步驟中,程式會驗證您定義為 frame 屬性的記錄計數。 DataFrame forEachBatch程式會將擷取時間戳記附加至非空白。 DataFramedata_frame.count()>0 子句會判斷最新的微批次是否並非空白,並且已準備好進行進一步處理。

def processBatch(data_frame, batchId): if data_frame.count() >0: AmazonKinesis_node1696872487972 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame", )

映射

該程式的下一部分是套用映射。火花上的Mapping.apply方法 DataFrame 可讓您定義圍繞資料元素的轉換規則。一般來說,您可以重新命名和變更資料類型,或在來源資料欄上套用自訂函數,並將這些函數映射至目標欄。

#Script generated for node ChangeSchema ChangeSchema_node16986872679326 = ApplyMapping.apply( frame = AmazonKinesis_node1696872487972, mappings = [ ("eventtime", "string", "eventtime", "string"), ("manufacturer", "string", "manufacturer", "string"), ("minutevolume", "long", "minutevolume", "int"), ("o2stats", "long", "OxygenSaturation", "int"), ("pressurecontrol", "long", "pressurecontrol", "int"), ("serialnumber", "string", "serialnumber", "string"), ("ventilatorid", "long", "ventilatorid", "long"), ("ingest_year", "string", "ingest_year", "string"), ("ingest_month", "string", "ingest_month", "string"), ("ingest_day", "string", "ingest_day", "string"), ("ingest_hour", "string", "ingest_hour", "string"), ], transformation_ctx="ChangeSchema_node16986872679326", ) )

接收

在這個部分,來自串流來源的傳入資料集會儲存在目標位置。在此範例中,我們會將資料寫入 Amazon S3 位置。AmazonS3_node_path 屬性詳細資料會根據您在建立任務期間所使用的設定從畫布預先填入。您可以根據您的使用案例來設定 updateBehavior,並決定不更新資料目錄表格,或在後續執行時建立資料目錄並更新資料目錄結構描述,或是建立目錄表格而不在後續執行時更新結構描述定義。

partitionKeys 屬性定義了存儲分割區選項。預設行為是根據在來源區段中提供的 ingestion_time_columns 來分割資料。compression 屬性可讓您設定要在目標寫入期間套用的壓縮演算法。您可以選擇設置 SnappyLZO,或GZIP作為壓縮技術。enableUpdateCatalog 屬性可控制是否需要更新 AWS Glue 目錄表格。此屬性可用的選項為 TrueFalse

#Script generated for node Amazon S3 AmazonS3_node1696872743449 = glueContext.getSink( path = AmazonS3_node1696872743449_path, connection_type = "s3", updateBehavior = "UPDATE_IN_DATABASE", partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"], compression = "snappy", enableUpdateCatalog = True, transformation_ctx = "AmazonS3_node1696872743449", )

AWS Glue 目錄水槽

此工作區段控制 AWS Glue 目錄表格更新行為。根據 AWS Glue 目錄資料庫名稱以及與您正在設catalogDatabase計的 AWS Glue 工作相關聯的表格名稱設定和catalogTableName性質。您可以透過 setFormat 屬性定義目標資料的檔案格式。在這個例子中,我們會以 Parquet 格式儲存資料。

參考本教學課程設定並執行 AWS Glue 串流任務後,產生的串流資料 Amazon Kinesis Data Streams 將以實際格式存放在 Amazon S3 位置,並使用快速壓縮。成功執行串流任務後,您將可透過 Amazon Athena查詢資料。

AmazonS3_node1696872743449 = setCatalogInfo( catalogDatabase = "demo", catalogTableName = "demo_stream_transform_result" ) AmazonS3_node1696872743449.setFormat("glueparquet") AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326") )
隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。