本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
以下各節提供有關「 AWS Glue 串流」概念的資訊。
AWS Glue 串流工作的剖析
AWS Glue 串流任務以 Spark 串流範例運作,並利用 Spark 架構中的結構化串流。串流任務會以特定時間間隔持續輪詢串流資料來源,以擷取記錄作為微批次。以下各節將檢查 AWS Glue 串流工作的不同部分。
forEachBatch
此方forEachBatch
法是 AWS Glue 串流工作執行的進入點。 AWS Glue 串流作業會使用此forEachBatch
方法輪詢資料的功能,類似於在串流工作生命週期期間保持作用中的迭代器,並定期輪詢串流來源以取得新資料,並以微型批次處理最新資料。
glueContext.forEachBatch(
frame=dataFrame_AmazonKinesis_node1696872487972,
batch_function=processBatch,
options={
"windowSize": "100 seconds",
"checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
},
)
設定 forEachBatch
的 frame
屬性以指定串流來源。在此範例中,您在建立工作期間在空白畫布中建立的來源節點會填入工作 DataFrame 的預設值。將 batch_function
屬性設定為您決定針對每個微批次作業呼叫的 function
。您必須定義函數來處理傳入資料的批次轉換。
來源
在processBatch
函數的第一個步驟中,程式會驗證您定義為 frame 屬性的記錄計數。 DataFrame forEachBatch
程式會將擷取時間戳記附加至非空白。 DataFramedata_frame.count()>0
子句會判斷最新的微批次是否並非空白,並且已準備好進行進一步處理。
def processBatch(data_frame, batchId):
if data_frame.count() >0:
AmazonKinesis_node1696872487972 = DynamicFrame.fromDF(
glueContext.add_ingestion_time_columns(data_frame, "hour"),
glueContext,
"from_data_frame",
)
映射
該程式的下一部分是套用映射。火花上的Mapping.apply
方法 DataFrame 可讓您定義圍繞資料元素的轉換規則。一般來說,您可以重新命名和變更資料類型,或在來源資料欄上套用自訂函數,並將這些函數映射至目標欄。
#Script generated for node ChangeSchema
ChangeSchema_node16986872679326 = ApplyMapping.apply(
frame = AmazonKinesis_node1696872487972,
mappings = [
("eventtime", "string", "eventtime", "string"),
("manufacturer", "string", "manufacturer", "string"),
("minutevolume", "long", "minutevolume", "int"),
("o2stats", "long", "OxygenSaturation", "int"),
("pressurecontrol", "long", "pressurecontrol", "int"),
("serialnumber", "string", "serialnumber", "string"),
("ventilatorid", "long", "ventilatorid", "long"),
("ingest_year", "string", "ingest_year", "string"),
("ingest_month", "string", "ingest_month", "string"),
("ingest_day", "string", "ingest_day", "string"),
("ingest_hour", "string", "ingest_hour", "string"),
],
transformation_ctx="ChangeSchema_node16986872679326",
)
)
接收
在這個部分,來自串流來源的傳入資料集會儲存在目標位置。在此範例中,我們會將資料寫入 Amazon S3 位置。AmazonS3_node_path
屬性詳細資料會根據您在建立任務期間所使用的設定從畫布預先填入。您可以根據您的使用案例來設定 updateBehavior
,並決定不更新資料目錄表格,或在後續執行時建立資料目錄並更新資料目錄結構描述,或是建立目錄表格而不在後續執行時更新結構描述定義。
partitionKeys
屬性定義了存儲分割區選項。預設行為是根據在來源區段中提供的 ingestion_time_columns
來分割資料。compression
屬性可讓您設定要在目標寫入期間套用的壓縮演算法。您可以選擇設置 SnappyLZO,或GZIP作為壓縮技術。enableUpdateCatalog
屬性可控制是否需要更新 AWS Glue 目錄表格。此屬性可用的選項為 True
或 False
。
#Script generated for node Amazon S3
AmazonS3_node1696872743449 = glueContext.getSink(
path = AmazonS3_node1696872743449_path,
connection_type = "s3",
updateBehavior = "UPDATE_IN_DATABASE",
partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"],
compression = "snappy",
enableUpdateCatalog = True,
transformation_ctx = "AmazonS3_node1696872743449",
)
AWS Glue 目錄水槽
此工作區段控制 AWS Glue 目錄表格更新行為。根據 AWS Glue 目錄資料庫名稱以及與您正在設catalogDatabase
計的 AWS Glue 工作相關聯的表格名稱設定和catalogTableName
性質。您可以透過 setFormat
屬性定義目標資料的檔案格式。在這個例子中,我們會以 Parquet 格式儲存資料。
參考本教學課程設定並執行 AWS Glue 串流任務後,產生的串流資料 Amazon Kinesis Data Streams 將以實際格式存放在 Amazon S3 位置,並使用快速壓縮。成功執行串流任務後,您將可透過 Amazon Athena查詢資料。
AmazonS3_node1696872743449 = setCatalogInfo(
catalogDatabase = "demo", catalogTableName = "demo_stream_transform_result"
)
AmazonS3_node1696872743449.setFormat("glueparquet")
AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326")
)