DynamicFrame 類 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

DynamicFrame 類

Apache Spark 其中一個主要抽象為 SparkSQL DataFrame,其與 R 和 pandas 中找到的 DataFrame 結構類似。DataFrame 類似於資料表並支援功能樣式 (對應/減少/篩選/等) 操作和 SQL 操作 (選擇、專案、彙總)。

DataFrames 功能強大,受到廣泛採用,但其在擷取、轉換和載入 (ETL) 操作上受到限制。最重要的是,其需要指定結構描述,才能載入任何資料。SparkSQL 可解決此問題,其進行兩次資料傳送,第一個推斷結構描述,第二個則載入資料。不過,此推斷相當有限,無法滿足龐大資料的實際需求。例如,相同的欄位在不同的記錄內可能為不同的類型。Apache Spark 通常讓出並使用原始欄位文字回報類型為 string。這可能不正確,而且您可能需要更精確控制如何解決結構描述的差異。此外,對於大型資料集,額外傳送來源資料的代價可能使人卻步地高昂。

為了解決這些限制, AWS Glue 引入了DynamicFrame. DynamicFrame 類似 DataFrame,但每筆記錄均為自我描述,且開始時不需結構描述。而是在需要 on-the-fly時AWS Glue計算結構描述,並使用選擇 (或聯集) 類型明確地對結構描述不一致性進行編碼。您可以解決這些不一致,讓您的資料集相容於需要固定結構描述的資料存放區。

同樣地,DynamicRecord 代表 DynamicFrame 內的邏輯記錄。其類似 Spark DataFrame 中的資料列,除了它是自我描述的,以及可用於不符合固定結構描述的資料。搭配使用 AWS Glue 時 PySpark,通常不會獨立操作DynamicRecords。相反地,您可以透過其 DynamicFrame 一起轉換資料集。

您可以在解決任何結構描述不一致後反覆轉換 DynamicFramesDataFrames

 — construction —

__init__

__init__(jdf, glue_ctx, name)
  • jdf – Java 虛擬機器 (JVM) 中資料框架的參考。

  • glue_ctxGlueContext 類 物件。

  • name – 選用名稱字串,預設是空的。

fromDF

fromDF(dataframe, glue_ctx, name)

DataFrame 欄位轉換為 DynamicFrame 欄位,藉此將 DataFrame 轉換為 DynamicRecord。傳回新的 DynamicFrame

DynamicRecord 代表 DynamicFrame 中的邏輯記錄。它類似 Spark DataFrame 中的一列,除了它是自我描述的,以及可用於不符合固定結構描述的資料。

此函數會預期 DataFrame 中具有重複名稱的資料欄已受到解析。

  • dataframe – 要轉換的 Apache Spark SQL DataFrame (必要)。

  • glue_ctx – 指定轉換內容的 GlueContext 類 物件 (必要)。

  • name— 結果的名稱DynamicFrame(自 AWS Glue 3.0 以來是可選的)。

toDF

toDF(options)

DynamicRecords 轉換為 DataFrame 欄位,藉此將 DynamicFrame 轉換為 Apache Spark DataFrame。傳回新的 DataFrame

DynamicRecord 代表 DynamicFrame 中的邏輯記錄。它類似 Spark DataFrame 中的一列,除了它是自我描述的,以及可用於不符合固定結構描述的資料。

  • options – 選項清單。如果您選擇 ProjectCast 動作類型,請指定目標類型。範例如下。

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 — information —

count

count( ) – 傳回基礎 DataFrame 中的資料列數量。

結構描述

schema( ) – 傳回此 DynamicFrame 的結構描述,或者,假如不可用,則傳回基礎 DataFrame 的結構描述。

如需有關組成此結構描述的 DynamicFrame 類型的詳細資訊,請參閱 PySpark 延伸模組類型

printSchema

printSchema( ) – 列印基礎 DataFrame 的結構描述。

顯示

show(num_rows) – 列印基礎 DataFrame 的指定資料列數量。

repartition

repartition(numPartitions) – 傳回包含 numPartitions 個分割區的新 DynamicFrame

coalesce

coalesce(numPartitions) – 傳回包含 numPartitions 個分割區的新 DynamicFrame

 — transforms —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

套用宣告映射至 DynamicFrame,並傳回將這些映射套用至您指定欄位的新 DynamicFrame。未指定的欄位將從新的 DynamicFrame 中省略。

  • mappings –映射元組的清單 (必要)。每個清單包括:(來源欄、來源類型、目標欄、目標類型)。

    如果來源資料欄的名稱中有一個小點 ".",則您必須在其前後加上反引號 "``"。例如,若要將 this.old.name (字串) 對應至 thisNewName,會使用以下元組:

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用 apply_map 來重新命名欄位並變更欄位類型

以下程式碼顯示使用 apply_mapping 方法重新命名所選欄位和更改欄位類型的方法。

注意

若要存取此範例中使用的資料集,請參閱 程式碼範例:加入和關聯化資料 並依照 步驟 1:在 Amazon S3 儲存貯體中網路爬取資料 中的說明進行。

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

呼叫 FlatMap 類別 轉換,從 DynamicFrame 移除欄位。傳回捨棄了指定欄位的新 DynamicFrame

  • paths – 字串清單。各包含您想捨棄的欄位節點的完整路徑。您可以使用點標記法來指定巢狀欄位。例如,如果欄位 first 是樹狀結構中的子欄位 name,您可以指定 "name.first" 為路徑。

    如果欄位節點的名稱中有常值 .,您必須以反引號將名稱括起 (`)。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用 drop_fields 從 DynamicFrame 中移除欄位

此程式碼範例使用 drop_fields 方法從 DynamicFrame 中移除選取的頂層和巢狀欄位。

範例資料集

此範例使用下列資料集,該資料集由程式碼中的 EXAMPLE-FRIENDS-DATA 表格表示:

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

範例程式碼

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

篩選條件

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

傳回新的 DynamicFrame,其中包含所有 DynamicRecords,其滿足輸入 DynamicFrame 且指定的述詞函數 f

  • f – 要套用至 DynamicFrame 的述詞函數。此函數必須以 DynamicRecord 做為引數並傳回 True,如果 DynamicRecord 符合篩選條件要求,否則將傳回 False (必要)。

    DynamicRecord 代表 DynamicFrame 中的邏輯記錄。它類似 Spark DataFrame 中的一列,除了它是自我描述的,以及可用於不符合固定結構描述的資料。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用篩選條件取得已篩選的欄位選取

此範例使用filter方法來建立新的DynamicFrame,其中包括對另一個 DynamicFrame 的欄位的已篩選選取。

map 方法一樣,filter 需要一個函數作為引數,該引數應用於原始 DynamicFrame 中的每個記錄。該函數需要一個記錄作為輸入,並傳回一個布林值。如果傳回值為 true,記錄會包含在所產生的 DynamicFrame 中。如果傳回值為 false,記錄會被排除在外。

注意

若要存取此範例中使用的資料集,請參閱 程式碼範例:使用 ResolveChoice Lambda 和 ApplyMapping 並依照 步驟 1:在 Amazon S3 儲存貯體中網路爬取資料 中的說明進行。

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

執行與其他 DynamicFrame 的對等性加入,並傳回產生的 DynamicFrame

  • paths1 – 要加入的此框架中的金鑰清單。

  • paths2 – 要加入的其他框架中的金鑰清單。

  • frame2 – 要加入的其他 DynamicFrame

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用聯結合併 DynamicFrames

這個例子使用該join方法在三個上執行聯接DynamicFrames。 AWS Glue 會根據您提供的欄位金鑰執行聯結。產生的 DynamicFrame 包含兩個原始影格的列,其中指定之索引鍵相符。

請注意,join 轉換會保持所有欄位不變。這意味著您指定要匹配的字段會顯示在結果中 DynamicFrame,即使它們是多餘的並且包含相同的鍵也是如此。在此範例中,我們使用 drop_fields 在聯結後移除這些多餘的索引鍵。

注意

若要存取此範例中使用的資料集,請參閱 程式碼範例:加入和關聯化資料 並依照 步驟 1:在 Amazon S3 儲存貯體中網路爬取資料 中的說明進行。

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

傳回套用指定映射函數至原始 DynamicFrame 中所有記錄而產生的新 DynamicFrame

  • f – 套用到 DynamicFrame 中所有記錄的映射函數。此函數必須以 DynamicRecord 做為引數,並傳回新的 DynamicRecord (必要)。

    DynamicRecord 代表 DynamicFrame 中的邏輯記錄。它類似 Apache Spark DataFrame 中的一列,除了它是自我描述的,以及可用於不符合固定結構描述的資料。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與轉換中的錯誤相關的字串 (選用)。

  • stageThreshold – 在錯誤輸出之前,轉換作業中可發生錯誤的次數上限 (選用)。預設為零。

  • totalThreshold – 在處理錯誤輸出之前,整體作業可發生錯誤的次數上限 (選用)。預設為零。

範例:使用 map 將函數套用至 DynamicFrame 中的每個記錄

此範例示範如何使用 map 方法將函數套用至 DynamicFrame 的每個記錄。具體來說,此範例套用名為 MergeAddress 函數至每個記錄,以便將多個地址欄位合併為一個 struct 類型。

注意

若要存取此範例中使用的資料集,請參閱 程式碼範例:使用 ResolveChoice Lambda 和 ApplyMapping 並依照 步驟 1:在 Amazon S3 儲存貯體中網路爬取資料 中的說明進行。

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

根據指定的主索引鍵來合併此 DynamicFrame 與暫存 DynamicFrame 以識別記錄。重複的記錄 (具有相同主索引鍵的記錄) 不會被刪除重複資料。如果暫存影格中沒有相符的記錄,則會保留來源中的所有記錄 (包括重複項)。如果暫存影格具有相符的記錄,則暫存影格中的記錄會覆寫 AWS Glue 中來源的記錄。

  • stage_dynamic_frame – 要合併的暫存 DynamicFrame

  • primary_keys - 要從來源和暫存動態影格比對記錄的主索引鍵欄位清單。

  • transformation_ctx - 用來擷取目前轉換之中繼資料的唯一字串 (選用)。

  • options - JSON 名稱值組的字串,可提供此轉換的額外資料。目前未使用此引數。

  • infoString。與轉換中的錯誤相關的任何字串。

  • stageThresholdLong。在給定轉換中的錯誤數量,其處理需要輸出錯誤。

  • totalThresholdLong。在此轉換之前 (包括在此轉換中) 的錯誤總數,其處理需要輸出錯誤。

此方法會傳回透過將此 DynamicFrame 與暫存 DynamicFrame 合併而取得的新 DynamicFrame

在下列情況下,傳回的 DynamicFrame 包含記錄 A:

  • 如果 A 同時存在於來源影格和暫存影格,則會傳回暫存影格中的 A

  • 如果 A 位於來源資料表中而 A.primaryKeys 不在 stagingDynamicFrame 中,則 A 不會在暫存資料表中更新。

來源影格和暫存影格不需要具有相同的結構描述。

示例:用 mergeDynamicFrame 於DynamicFrames根據主鍵合併兩個

下列程式碼範例示範如何使用 mergeDynamicFrame 方法,根據主索引鍵 idDynamicFrame 與「暫存」DynamicFrame 合併。

範例資料集

該範例使用稱為 split_rows_collection 的來自 DynamicFrameCollection 的兩個 DynamicFrames。以下是 split_rows_collection 中的索引鍵清單。

dict_keys(['high', 'low'])

範例程式碼

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

關聯化

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame 轉換為適合關聯式資料庫的表單。當您想要將資料從 DynamoDB 等 NoSQL 環境移動到 MySQL 等關聯式資料庫時,關聯化 DynamicFrame 特別有用。

透過對巢狀化欄解除巢狀化並將陣列欄直轉橫,可產生框架清單。使用在解除巢狀化階段中所產生的聯結鍵,將直轉橫的陣列欄聯結至根資料表。

  • root_table_name – 根資料表的名稱。

  • staging_path – 該方法用來以 CSV 格式存放直轉橫資料表分區的路徑 (選用)。直轉橫資料表從這個路徑讀回。

  • options – 選用參數的字典。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用 relationalize 來壓平合併 DynamicFrame 中的巢狀化結構描述

此程式碼範例使用 relationalize 方法,將巢狀化結構描述壓平合併為適合關聯式資料庫的表單。

範例資料集

此範例會將稱為 legislators_combinedDynamicFrame 與下列結構描述搭配使用。legislators_combined 具有多個巢狀化欄位,例如 linksimagescontact_details,這些欄位將由 relationalize 轉換進行壓平合併。

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

範例程式碼

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

下列輸出可讓您將稱為 contact_details 的巢狀化欄位結構描述與 relationalize 轉換所建立的資料表進行比較。請注意,資料表記錄使用稱為 id 的外部索引鍵和代表陣列位置的 index 資料欄連結回主資料表。

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

重新命名此 DynamicFrame 中的欄位,並傳回欄位重新命名的新 DynamicFrame

  • oldName – 要重新命名之節點的完整路徑。

    如果舊名稱內有小點,RenameField 無法正常運作,除非在前後加上反引號 (`)。例如,若要將 this.old.name 換成 thisNewName,可以用下列方式呼叫 rename_field。

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName – 新的名稱,做為完整路徑。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用 rename_field 重新命名 DynamicFrame 中的欄位

此程式碼範例會使用 rename_field 方法重新命名 DynamicFrame 中的欄位。請注意,此範例使用方法鏈結同時重新命名多個欄位。

注意

若要存取此範例中使用的資料集,請參閱 程式碼範例:加入和關聯化資料 並依照 步驟 1:在 Amazon S3 儲存貯體中網路爬取資料 中的說明進行。

範例程式碼

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

在此 DynamicFrame 中解析所選類型,並傳回新的 DynamicFrame

  • specs – 要解析的特定模棱兩可項目的清單,形式皆為 tuple:(field_path, action)

    有兩種方式可以使用 resolveChoice。第一種是使用 specs 引數指定一系列的特定的欄以及解析它們的方式。resolveChoice 的其他模式是使用 choice 引數為所有 ChoiceTypes 指定單一解析度。

    specs 的值指定為由 (field_path, action) 對組成的元組。field_path 值代表模棱兩可的特定元素,action 值則代表對應的解析動作。可行的動作如下:

    • cast:type - 嘗試將所有值轉換至指定類型。例如:cast:int

    • make_cols - 將每個不同的類型轉換為具有 columnName_type 名稱的欄。透過將資料壓平合併來解析可能的模棱兩可項目。例如,如果 columnA 可能是 intstring,則在得出的 DynamicFrame 中,解析動作會產生名為 columnA_intcolumnA_string 的兩個欄。

    • make_struct – 藉由使用 struct 表示資料,來解決可能的模棱兩可項目。舉例來說,如果欄中的資料可能是 intstring,則 make_struct 動作會在產生的 DynamicFrame 中產生結構欄。每個結構都包含 intstring

    • project:type - 藉由將所有資料預測為一種可能的資料類型,來解決可能的模棱兩可項目。舉例來說,如果欄中的資料可能是 intstring,則使用 project:string 動作會在結果的 DynamicFrame 中產生欄,其中所有的 int 值皆轉換為字串。

    field_path 識別到陣列,在陣列的名稱後放置空白的方括號以避免模棱兩可的狀況。例如,假設您使用如下結構化的資料:

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    您可以選取數值而不是價格字串版本,方法是將 field_path 設定為 "myList[].price",且將 action 設定為 "cast:double"

    注意

    您只能使用 specschoice 參數的其中一項。如果 specs 參數不是 None,則 choice 參數必須為空字串。相反地,如果 choice 不是空字串,則 specs 參數必須為 None

  • choice – 為所有 ChoiceTypes 指定單一解析度。您可以在 ChoiceTypes 的完整清單在執行時間之前是未知的情況下使用此模式。除了以上列出的 specs 動作,此引數也支援下列動作:

    • match_catalog – 嘗試將每個 ChoiceType 投射至指定 Data Catalog 資料表中的對應類型。

  • database – 搭配 match_catalog 動作使用的 Data Catalog 資料庫。

  • table_name – 搭配 match_catalog 動作使用的 Data Catalog 資料表。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold – 直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設為零,表示流程不會錯誤輸出。

  • catalog_id – 要存取之 Data Catalog 的目錄 ID ( Data Catalog 的帳戶 ID)。當設定為 None (預設值) 時,它會使用呼叫帳戶的目錄 ID。

範例:使用 resolveChoice 來處理包含多種類型的資料欄

此程式碼範例會使用 resolveChoice 方法來指定如何處理包含多種類型值的 DynamicFrame 資料欄。該範例演示了處理具有不同類型欄的兩種常見方法:

  • 將資料欄轉換為單一資料類型。

  • 將所有類型保留在單獨的欄中。

範例資料集

注意

若要存取此範例中使用的資料集,請參閱 程式碼範例:使用 ResolveChoice Lambda 和 ApplyMapping 並依照 步驟 1:在 Amazon S3 儲存貯體中網路爬取資料 中的說明進行。

此範例將稱為 medicareDynamicFrame 與下列結構描述搭配使用:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

範例程式碼

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

傳回包含所選欄位的新 DynamicFrame

  • paths – 字串清單。每個字串清單均為您想要選擇的最上層節點的路徑。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用 select_fields 來用所選欄位建立新的 DynamicFrame

以下程式碼範例顯示如何使用 select_fields 方法建立新的 DynamicFrame,其具有從現有 DynamicFrame 中選取的欄位清單。

注意

若要存取此範例中使用的資料集,請參閱 程式碼範例:加入和關聯化資料 並依照 步驟 1:在 Amazon S3 儲存貯體中網路爬取資料 中的說明進行。

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

簡化日常生活

simplify_ddb_json(): DynamicFrame

簡化中特別位於 DynamoDB JSON 結構中的巢狀資料行,並傳回新的簡化。DynamicFrame DynamicFrame如果 List 類型中有多種類型或 Map 類型,則 List 中的元素將不會簡化。請注意,這是一種特定類型的轉換,其行為與一般unnest轉換不同,且需要資料已經位於 DynamoDB JSON 結構中。如需詳細資訊,請參閱 DynamoDB JSON

例如,讀取 DynamoDB JSON 結構的匯出結構描述與以下類似:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

simplify_ddb_json() 轉換會將此轉換為:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

範例:使用簡化方式來叫 DynamoDB JSON 簡化

此程式碼範例會使用此simplify_ddb_json方法來使用 AWS Glue DynamoDB 匯出連接器、叫用 DynamoDB JSON 簡化作業,以及列印分割區數目。

範例程式碼

from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())

spigot

spigot(path, options={})

將範例記錄寫入指定的目的地,以協助您驗證任務執行的轉換。

  • path - 要寫入的目的地路徑 (必要)。

  • options – 指定選項的索引鍵/值對 (選用)。"topk" 選項指定應寫入第一個 k 記錄。"prob" 選項指定選擇任何給定記錄的概率 (小數)。您可以使用其來選擇要寫入的記錄。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

範例:使用 spigot 將範例欄位從 DynamicFrame 寫入到 Amazon S3

此程式碼範例會在套用 select_fields 轉換後,使用 spigot 方法將範例記錄寫入 Amazon S3 儲存貯體。

範例資料集

注意

若要存取此範例中使用的資料集,請參閱 程式碼範例:加入和關聯化資料 並依照 步驟 1:在 Amazon S3 儲存貯體中網路爬取資料 中的說明進行。

此範例將稱為 personsDynamicFrame 與下列結構描述搭配使用:

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

範例程式碼

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

以下是 spigot 寫入 Amazon S3 的資料範例。由於範例程式碼指定了 options={"topk": 10},範例資料會包含前 10 筆記錄。

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

傳回新的 DynamicFrameCollection,其包含兩個 DynamicFrames。第一個 DynamicFrame 包含分割的所有節點,第二個包含其餘節點。

  • paths – 字串清單,其各自為想要分割到新 DynamicFrame 的節點的完整路徑。

  • name1 – 分割的 DynamicFrame 的名稱字串。

  • name2 – 分割指定節點後剩餘的 DynamicFrame 的名稱字串。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用 split_fields 將選取的欄位分割為單獨的 DynamicFrame

此程式碼範例會使用 split_fields 方法,將指定欄位的清單分割為單獨的 DynamicFrame

範例資料集

該範例使用稱為 l_root_contact_detailsDynamicFrame,其來自名為 legislators_relationalized 的集合。

l_root_contact_details 具有以下結構描述和項目。

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

範例程式碼

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame 中一個或多個欄分割成新的 DynamicFrame

該方法傳回新的 DynamicFrameCollection,其包含兩個 DynamicFrames。第一個 DynamicFrame 包含分割的所有列,第二個包含其餘節列。

  • comparison_dict – 一個字典,其中索引鍵為欄位的路徑,而對於與欄位數值相比較的數值而言,此數值為另一種字典映射比較運算子。例如,{"age": {">": 10, "<": 20}} 分割所有資料列,其年齡欄中的值大於 10 且小於 20。

  • name1 – 分割的 DynamicFrame 的名稱字串。

  • name2 – 分割指定節點後剩餘的 DynamicFrame 的名稱字串。

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用 split_rows 來分割 DynamicFrame 中的列

此程式碼範例使用 split_rows 方法,根據 id 欄位值來分割 DynamicFrame 中的列。

範例資料集

該範例使用稱為 l_root_contact_detailsDynamicFrame,其選自名為 legislators_relationalized 的集合。

l_root_contact_details 具有以下結構描述和項目。

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

範例程式碼

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

DynamicFrame 中的字串欄位拆箱 (重新格式化),並傳回包含拆箱的 DynamicRecords 的新 DynamicFrame

DynamicRecord 代表 DynamicFrame 中的邏輯記錄。它類似 Apache Spark DataFrame 中的一列,除了它是自我描述的,以及可用於不符合固定結構描述的資料。

  • path – 要拆箱之字串節點的完整路徑。

  • format – 格式化規格 (選用)。您可將其用於 Amazon S3 或支援多種格式的 AWS Glue 連線。如需了解受支援的格式,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • options – 下列一或多個:

    • separator – 包含分隔符號字元的字串。

    • escaper – 包含逸出字元的字串。

    • skipFirst – 布林值,指出是否略過第一個執行個體。

    • withSchema:包含節點結構描述的 JSON 表示法的字串。結構描述的 JSON 表示法的格式由 StructType.json() 的輸出定義。

    • withHeader – 布林值,指出是否包含標頭。

範例:使用 unbox 將字串欄位拆箱到結構中

此程式碼範例使用 unbox 方法,將 DynamicFrame 中的字串欄位拆箱或重新格式化為結構類型的欄位。

範例資料集

此範例搭配使用稱為 mapped_with_stringDynamicFrame 與下列結構描述和項目。

請注意名為 AddressString 的欄位。這是範例拆箱為結構的欄位。

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

範例程式碼

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

聯集

union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)

聯盟二 DynamicFrames。返回 DynamicFrame 包含來自兩個輸入的所有記錄 DynamicFrames。這種轉換可能會從兩個 DataFrames 與等效數據的聯合返回不同的結果。如果您需要 Spark DataFrame 聯合行為,請考慮使用toDF.

  • frame1— 首先 DynamicFrame 是工會。

  • frame2— 第二 DynamicFrame 次聯合。

  • transformation_ctx – (選用) 用於識別統計資料/狀態資訊的唯一字串

  • info – (選用) 與轉換中的錯誤相關的任何字串

  • stageThreshold – (選用) 在處理輸出錯誤之前,轉換中的最大錯誤數

  • totalThreshold – (選用) 在處理輸出錯誤之前的最大錯誤數。

解巢狀

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

DynamicFrame 中的巢狀化物件進行解除巢狀化,將其變為頂層元素,並傳回新的未巢狀化 DynamicFrame

  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

  • totalThreshold –直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用)。預設值為零,表示此流程不會發生錯誤。

範例:使用 unnest 將巢狀化欄位轉換為頂層欄位

此程式碼範例使用 unnest 方法,將 DynamicFrame 中的所有巢狀化欄位壓平合併為頂層欄位。

範例資料集

此範例搭配使用稱為 mapped_medicareDynamicFrame 與下列結構描述。請注意,Address 欄位是唯一包含巢狀化資料的欄位。

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

範例程式碼

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

解除專屬於 DynamoDB JSON 結構中 DynamicFrame 內的巢狀欄的巢狀化,並傳回新的解巢狀 DynamicFrame。結構類型陣列的欄將不是解巢狀狀態。請注意,這是一種特定類型的解除巢狀化轉換,其行為與常規 unnest 轉換不同,且資料必須已經位於 DynamoDB JSON 結構中。如需詳細資訊,請參閱 DynamoDB JSON

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
  • transformation_ctx – 用於識別狀態資訊的唯一字串 (選用)。

  • info – 與此轉換回報錯誤關聯的字串 (選用)。

  • stageThreshold – 此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用:預設為 0,表示流程不會錯誤輸出)。

  • totalThreshold – 直到及包含此轉換期間流程應錯誤輸出之前遇到的錯誤次數 (選用:預設為 0,表示流程不會錯誤輸出)。

例如,讀取 DynamoDB JSON 結構的匯出結構描述與以下類似:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnest_ddb_json() 轉換會將此轉換為:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

下列程式碼範例示範如何使用 AWS Glue DynamoDB 匯出連接器、取消巢狀呼叫 DynamoDB JSON,以及如何列印分割區的數目:

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

write

write(connection_type, connection_options, format, format_options, accumulator_size)

從此 DynamicFrameGlueContext 類 取得指定連線類型的 DataSink(object),並用其來格式化及寫入此 DynamicFrame 的內容。傳回依指定格式化和寫入的新 DynamicFrame

  • connection_type – 使用的連線類型。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracle

  • connection_options – 使用的連線選項 (選用)。如果是 connection_types3,會定義 Amazon S3 路徑。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    如果是 JDBC 連線,必須定義幾項屬性。請注意,資料庫名稱必須是 URL 的一部分。它可以選擇性包含在連線選項中。

    警告

    不建議在指令碼中存放密碼。考慮使boto3用從 AWS Secrets Manager 或 AWS Glue 資料型錄擷取它們。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format – 格式化規格 (選用)。這用於 Amazon Simple Storage Service (Amazon S3) 或支援多種格式的 AWS Glue 連線。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • format_options – 指定格式的格式選項。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • accumulator_size:要使用的 accumulable 大小,以位元組為單位 (選用)。

 — errors —

assertErrorThreshold

assertErrorThreshold( ) – 建立此 DynamicFrame 的轉換中的錯誤宣告。從基礎 DataFrame 傳回 Exception

errorsAsDynamic框架

errorsAsDynamicFrame( ) – 傳回 DynamicFrame,其內部有巢狀的錯誤記錄。

範例:使用 errorsAsDynamic Frame 檢視錯誤記錄

以下程式碼範例顯示如何使用 errorsAsDynamicFrame 方法來檢視 DynamicFrame 的錯誤記錄。

範例資料集

此範例使用下列資料集,您可以將其作為 JSON 上傳到 Amazon S3。請注意,第二條記錄的格式錯誤。當您使用 SparkSQL 時,格式錯誤的資料通常會中斷檔案剖析。但是,DynamicFrame 會辨識出格式錯誤問題,並將格式錯誤的行轉換為可以單獨處理的錯誤記錄。

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

範例程式碼

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ) – 傳回 DynamicFrame 中的錯誤總數。

stageErrorsCount

stageErrorsCount – 傳回產生此 DynamicFrame 過程中發生的錯誤數量。