Apache Spark の主要な抽象化の 1 つは SparkSQL DataFrame
で、これは R と Pandas にある DataFrame
構造に似ています。DataFrame
はテーブルと似ており、機能スタイル (マップ/リデュース/フィルター/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。
DataFrames
は、強力で広く使用されていますが、抽出、変換、ロード (ETL) 操作に関しては制限があります。最も重要なのは、データをロードする前にスキーマを指定する必要があることです。SparkSQL は、データに対してパスを 2 つ作ることでこれを解決します。この 1 つ目はスキーマの推定を行い、2 つ目はデータをロードします。ただし、この推測は限定されており、実際の煩雑なデータには対応しません。例えば、同じフィールドが異なるレコードの異なるタイプである可能性があります。Apache Spark は、多くの場合、作業を中断して、元のフィールドテキストを使用して型を string
として報告します。これは正しくない可能性があり、スキーマの不一致を解決する方法を細かくコントロールする必要があるかもしれません。また、大規模なデータセットの場合、ソースデータに対する追加パスが非常に高価になる可能性があります。
これらの制限に対応するために、AWS Glue では DynamicFrame
を導入しています。DynamicFrame
は、DataFrame
と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWS Glue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。これらの不整合を解決して、固定スキーマを必要とするデータストアとデータセットを互換性のあるものにできます。
同様に、DynamicRecord
は DynamicFrame
内の論理レコードを表します。これは、Spark DataFrame
の行と似ていますが、自己記述型であり、固定スキーマに適合しないデータに使用できます。PySpark で AWS Glue を使用する場合、通常は独立した操作 DynamicRecords
は行いません。むしろ、データセットをその DynamicFrame
を使ってまとめて変換します。
スキーマの不一致を解決したら、DynamicFrames
を DataFrames
との間で変換することができます。
— construction —
__init__
__init__(jdf, glue_ctx, name)
-
jdf
- Java 仮想マシン (JVM) 内のデータフレームへの参照。 -
glue_ctx
– GlueContext クラス オブジェクト。 -
name
- オプションの名前文字列。デフォルトでは空。
fromDF
fromDF(dataframe, glue_ctx, name)
DataFrame
フィールドを DynamicRecord
に変換することにより、DataFrame
を DynamicFrame
に変換します。新しい DynamicFrame
を返します。
DynamicRecord
は DynamicFrame
内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame
の行に似ています。
この関数は、DataFrame
で名前と重複する列がすでに解決されていることを前提としています。
-
dataframe
- 変換する Apache Spark SQLDataFrame
(必須)。 -
glue_ctx
- この変換のコンテキストを指定する GlueContext クラス オブジェクト (必須)。 -
name
- 結果DynamicFrame
の名前 (AWS Glue 3.0 以降はオプション)。
toDF
toDF(options)
DynamicRecords
を DataFrame
フィールドに変換することにより、DynamicFrame
を Apache Spark DataFrame
に変換します。新しい DataFrame
を返します。
DynamicRecord
は DynamicFrame
内の論理レコードを表します。これは、自己記述型であり、固定スキーマに適合しないデータに使用できる点を除いて、Spark DataFrame
の行に似ています。
-
options
- オプションのリスト。変換プロセスの追加オプションを指定できます。「options」パラメータで使用できる有効なオプションは次のとおりです。-
format
– データの形式 (json、csv、parquet など) を指定します。 -
separater or sep
– CSV ファイルの場合、区切り文字を指定します。 -
header
– CSV ファイルの場合、最初の行がヘッダーであるかどうかを示します (true/false)。 -
inferSchema
– Spark にスキーマを自動的に推測するように指示します (true/false)。
「toDF」メソッドで「options」パラメータを使用する例を次に示します。
from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) csv_dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://my-bucket/path/to/csv/"]}, format="csv" ) csv_cf = csv_dyf.toDF(options={ "separator": ",", "header": "true", "ïnferSchema": "true" })
Project
とCast
アクションタイプを選択した場合、ターゲットのタイプを指定します。次に例を示します。>>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])
-
— information —
count
count( )
- 基盤となる DataFrame
の行数を返します。
スキーマ
schema( )
- この DynamicFrame
のスキーマを返します。使用できない場合は、基盤となる DataFrame
のスキーマを返します。
このスキーマを構成する DynamicFrame
タイプの詳細については、「PySpark 拡張子型」を参照してください。
printSchema
printSchema( )
- 基盤となる DataFrame
のスキーマを表示します。
show
show(num_rows)
- 基盤となる DataFrame
から、指定された行数を表示します。
repartition
repartition(numPartitions)
– numPartitions
個のパーティションを含む新しい DynamicFrame
を返します。
coalesce
coalesce(numPartitions)
– numPartitions
個のパーティションを含む新しい DynamicFrame
を返します。
— transforms —
apply_mapping
apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
DynamicFrame
に宣言型のマッピングを適用し、それらのマッピングが適用された新しい DynamicFrame
を指定したフィールドに返します。未指定のフィールドは新しい DynamicFrame
から除外されます。
-
mappings
— マッピングタプルのリスト (必須)。それぞれが (ソース列、ソースタイプ、ターゲット列、ターゲットタイプ) で構成されます。ソース列で、名前にドット「
.
」が含まれている場合、バックティック「``
」で囲む必要があります。例えば、this.old.name
(文字列) をthisNewName
にマッピングするには、次のタプルを使用します。("`this.old.name`", "string", "thisNewName", "string")
-
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: apply_mapping を使用してフィールドの名前を変更し、フィールドタイプを変更する
次のコード例は、apply_mapping
メソッドを使用して、選択したフィールドの名前を変更し、フィールドタイプを変更する方法を示しています。
注記
この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。
# Example: Use apply_mapping to reshape source data into
# the desired column names and types as a new DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Create a DynamicFrame and view its schema
persons = glueContext.create_dynamic_frame.from_catalog(
database="legislators", table_name="persons_json"
)
print("Schema for the persons DynamicFrame:")
persons.printSchema()
# Select and rename fields, change field type
print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:")
persons_mapped = persons.apply_mapping(
[
("family_name", "String", "last_name", "String"),
("name", "String", "first_name", "String"),
("birth_date", "String", "date_of_birth", "Date"),
]
)
persons_mapped.printSchema()
Schema for the persons DynamicFrame:
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
Schema for the persons_mapped DynamicFrame, created with apply_mapping:
root
|-- last_name: string
|-- first_name: string
|-- date_of_birth: date
drop_fields
drop_fields(paths, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
FlatMap クラス 変換を呼び出して、DynamicFrame
からフィールドを削除します。指定されたフィールドが削除された新しい DynamicFrame
を返します。
-
paths
- 文字列のリスト。それぞれに、ドロップするフィールドノードへのフルパスが含まれます。ドット表記を使用して、ネストされたフィールドを指定できます。例えば、フィールドfirst
がツリー内のフィールドname
の子である場合は、パスに"name.first"
を指定します。フィールドノードの名前にリテラル
.
が含まれている場合は、その名前をバックティック (`
) で囲む必要があります。 -
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: drop_fields を使用して DynamicFrame
からフィールドを削除する
このコード例では、drop_fields
メソッドを使用して、選択したトップレベルフィールドとネストされたフィールドを DynamicFrame
から削除します。
データセットの例
この例では、コード内の EXAMPLE-FRIENDS-DATA
テーブルで表される次のデータセットを使用します。
{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []}
{"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]}
{"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]}
{"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}}
{"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}
コードの例
# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame.
# Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name.
# Replace EXAMPLE-FRIENDS-DATA with your table name.
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Create a DynamicFrame from Glue Data Catalog
glue_source_database = "MY-EXAMPLE-DATABASE
"
glue_source_table = "EXAMPLE-FRIENDS-DATA
"
friends = glueContext.create_dynamic_frame.from_catalog(
database=glue_source_database, table_name=glue_source_table
)
print("Schema for friends DynamicFrame before calling drop_fields:")
friends.printSchema()
# Remove location.county, remove friends.age, remove age
friends = friends.drop_fields(paths=["age", "location.county", "friends.age"])
print("Schema for friends DynamicFrame after removing age, county, and friend age:")
friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields:
root
|-- name: string
|-- age: int
|-- location: struct
| |-- state: string
| |-- county: string
|-- friends: array
| |-- element: struct
| | |-- name: string
| | |-- age: int
Schema for friends DynamicFrame after removing age, county, and friend age:
root
|-- name: string
|-- location: struct
| |-- state: string
|-- friends: array
| |-- element: struct
| | |-- name: string
フィルター
filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
指定された述語関数 f
を満たす入力 DynamicFrame
内のすべての DynamicRecords
を含む、新しい DynamicFrame
を返します。
-
f
-DynamicFrame
に適用する述語関数。この関数はDynamicRecord
を引数として取り、DynamicRecord
がフィルター要件を満たす場合は True を返し、そうでない場合は False を返します (必須)。DynamicRecord
はDynamicFrame
内の論理レコードを表します。これは、自己記述型であり、固定スキーマに準拠しないデータに使用できる点を除いて、SparkDataFrame
の行に似ています。 -
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: フィルターを使用して、フィールドのフィルタリングされた選択内容を取得する
この例では、filter
メソッドを使用して、別の DynamicFrame
のフィールドのフィルタリングされた選択内容を含む新しい DynamicFrame
を作成します。
map
メソッドと同様に、filter
は、元の DynamicFrame
の各レコードに適用される引数として関数を取ります。この関数はレコードを入力として受け取り、ブール値を返します。戻り値が true の場合、レコードは結果として生じる DynamicFrame
に含まれます。false の場合、レコードは除外されます。
注記
この例で使用されているデータセットにアクセスするには、「コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。
# Example: Use filter to create a new DynamicFrame
# with a filtered selection of records
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Create DynamicFrame from Glue Data Catalog
medicare = glueContext.create_dynamic_frame.from_options(
"s3",
{
"paths": [
"s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"
]
},
"csv",
{"withHeader": True},
)
# Create filtered DynamicFrame with custom lambda
# to filter records by Provider State and Provider City
sac_or_mon = medicare.filter(
f=lambda x: x["Provider State"] in ["CA", "AL"]
and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"]
)
# Compare record counts
print("Unfiltered record count: ", medicare.count())
print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065
Filtered record count: 564
join
join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
別の DynamicFrame
と等価結合を実行し、結果の DynamicFrame
を返します。
-
paths1
- 結合するこのフレームのキーのリスト。 -
paths2
- 結合する別のフレームのキーのリスト。 -
frame2
- 結合する他のDynamicFrame
。 -
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: 結合を使用して DynamicFrames
を結合する
この例では、join
メソッドを使用して 3 つの DynamicFrames
で結合を実行します。AWSGlue は、入力されたフィールドキーに基づいて結合を実行します。結果として生じる DynamicFrame
には、指定されたキーが一致する元の 2 つのフレームからの行が含まれます。
join
変換では、すべてのフィールドがそのまま保持されることに注意してください。これは、一致するように指定したフィールドが冗長で同じキーが含まれていても、結果として生じる DynamicFrame に表示されることを意味します。この例では、drop_fields
を使用して、結合後にこれらの冗長なキーを削除します。
注記
この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。
# Example: Use join to combine data from three DynamicFrames
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Load DynamicFrames from Glue Data Catalog
persons = glueContext.create_dynamic_frame.from_catalog(
database="legislators", table_name="persons_json"
)
memberships = glueContext.create_dynamic_frame.from_catalog(
database="legislators", table_name="memberships_json"
)
orgs = glueContext.create_dynamic_frame.from_catalog(
database="legislators", table_name="organizations_json"
)
print("Schema for the persons DynamicFrame:")
persons.printSchema()
print("Schema for the memberships DynamicFrame:")
memberships.printSchema()
print("Schema for the orgs DynamicFrame:")
orgs.printSchema()
# Join persons and memberships by ID
persons_memberships = persons.join(
paths1=["id"], paths2=["person_id"], frame2=memberships
)
# Rename and drop fields from orgs
# to prevent field name collisions with persons_memberships
orgs = (
orgs.drop_fields(["other_names", "identifiers"])
.rename_field("id", "org_id")
.rename_field("name", "org_name")
)
# Create final join of all three DynamicFrames
legislators_combined = orgs.join(
paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships
).drop_fields(["person_id", "org_id"])
# Inspect the schema for the joined data
print("Schema for the new legislators_combined DynamicFrame:")
legislators_combined.printSchema()
Schema for the persons DynamicFrame:
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
Schema for the memberships DynamicFrame:
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
Schema for the orgs DynamicFrame:
root
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- seats: int
|-- type: string
Schema for the new legislators_combined DynamicFrame:
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- start_date: string
|-- family_name: string
|-- id: string
|-- death_date: string
|-- end_date: string
マップ
map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
指定したマッピング関数を元の DynamicFrame
のすべてのレコードに適用した結果の新しい DynamicFrame
を返します。
-
f
-DynamicFrame
内のすべてのレコードに適用されるマッピング関数。この関数は、DynamicRecord
を引数として取り、新しいDynamicRecord
を返す必要があります (必須)。DynamicRecord
はDynamicFrame
内の論理レコードを表します。これは、自己記述型であり、固定スキーマに準拠しないデータに使用できる点を除いて、Apache SparkDataFrame
の行に似ています。 transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。info
- 変換のエラーに関連付けられた文字列 (オプション)。stageThreshold
– エラーを出力する前に、変換で発生する可能性のあるエラーの最大数 (オプション)。デフォルト値は 0 です。totalThreshold
– エラーの出力を処理する前に、全体で発生する可能性のあるエラーの最大数 (オプション)。デフォルト値は 0 です。
例: マッピングを使用して、DynamicFrame
のすべてのレコードに関数を適用する
この例は、map
メソッドを使用して、関数を DynamicFrame
のすべてのレコードに適用する方法を示しています。具体的には、この例では、複数のアドレスフィールドを単一の struct
タイプに結合するため、MergeAddress
という名前の関数を各レコードに適用します。
注記
この例で使用されているデータセットにアクセスするには、「コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。
# Example: Use map to combine fields in all records
# of a DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Create a DynamicFrame and view its schema
medicare = glueContext.create_dynamic_frame.from_options(
"s3",
{"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]},
"csv",
{"withHeader": True})
print("Schema for medicare DynamicFrame:")
medicare.printSchema()
# Define a function to supply to the map transform
# that merges address fields into a single field
def MergeAddress(rec):
rec["Address"] = {}
rec["Address"]["Street"] = rec["Provider Street Address"]
rec["Address"]["City"] = rec["Provider City"]
rec["Address"]["State"] = rec["Provider State"]
rec["Address"]["Zip.Code"] = rec["Provider Zip Code"]
rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]]
del rec["Provider Street Address"]
del rec["Provider City"]
del rec["Provider State"]
del rec["Provider Zip Code"]
return rec
# Use map to apply MergeAddress to every record
mapped_medicare = medicare.map(f = MergeAddress)
print("Schema for mapped_medicare DynamicFrame:")
mapped_medicare.printSchema()
Schema for medicare DynamicFrame:
root
|-- DRG Definition: string
|-- Provider Id: string
|-- Provider Name: string
|-- Provider Street Address: string
|-- Provider City: string
|-- Provider State: string
|-- Provider Zip Code: string
|-- Hospital Referral Region Description: string
|-- Total Discharges: string
|-- Average Covered Charges: string
|-- Average Total Payments: string
|-- Average Medicare Payments: string
Schema for mapped_medicare DynamicFrame:
root
|-- Average Total Payments: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address: struct
| |-- Zip.Code: string
| |-- City: string
| |-- Array: array
| | |-- element: string
| |-- State: string
| |-- Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string
mergeDynamicFrame
mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "",
options = {}, info = "", stageThreshold = 0, totalThreshold = 0)
レコードを識別するために、この DynamicFrame
を指定されたプライマリキーに基づくステージング DynamicFrame
とマージします。重複レコード (同じプライマリキーを持つレコード) は重複除外されません。ステージングフレームに一致するレコードがない場合、すべてのレコード(重複を含む)がソースから保持されます。ステージングフレームに一致するレコードがある場合、ステージングフレームのレコードによって、AWS Glue のソースのレコードが上書きされます。
-
stage_dynamic_frame
– マージするステージングDynamicFrame
。 -
primary_keys
– ソースおよびステージング動的フレームからのレコードを照合する、プライマリキーフィールドのリスト。 -
transformation_ctx
– 現在の変換に関するメタデータを取得するために使用される一意の文字列 (オプション)。 -
options
– この変換に関する追加情報を提供する、JSON の名前と値のペアを示す文字列。この引数は現在使用されていません。 -
info
–String
。この変換でのエラーに関連付けられる任意の文字列。 -
stageThreshold
–Long
。指定された変換で処理がエラーアウトする必要があるエラーの数。 -
totalThreshold
–Long
。この変換までに発生したエラーのうち、処理でエラーを出力する必要があるエラーの合計数。
このメソッドは、この DynamicFrame
をステージング DynamicFrame
とマージして取得した新しい DynamicFrame
を返します。
以下の場合、返される DynamicFrame
にはレコード A が含まれます。
-
A
がソースフレームとステージングフレームの両方に存在する場合、ステージングフレームのA
が返されます。 -
A
がソーステーブルに存在し、A.primaryKeys
がstagingDynamicFrame
に存在しない場合、A
はステージングテーブルで更新されていません。
ソースフレームとステージングフレームが、同じスキーマを持つ必要はありません。
例: mergeDynamicFrame を使用して、プライマリキーに基づいて 2 つの DynamicFrames
をマージする
次のコード例は、mergeDynamicFrame
メソッドを使用して、プライマリキー id
に基づいて DynamicFrame
を「ステージング」DynamicFrame
とマージする方法を示しています。
データセットの例
この例では、split_rows_collection
と呼ばれる DynamicFrameCollection
からの 2 つの DynamicFrames
を使用しています。次のリストに示しているのは、split_rows_collection
のキーです。
dict_keys(['high', 'low'])
コードの例
# Example: Use mergeDynamicFrame to merge DynamicFrames
# based on a set of specified primary keys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import SelectFromCollection
# Inspect the original DynamicFrames
frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low")
print("Inspect the DynamicFrame that contains rows where ID < 10")
frame_low.toDF().show()
frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high")
print("Inspect the DynamicFrame that contains rows where ID > 10")
frame_high.toDF().show()
# Merge the DynamicFrames based on the "id" primary key
merged_high_low = frame_high.mergeDynamicFrame(
stage_dynamic_frame=frame_low, primary_keys=["id"]
)
# View the results where the ID is 1 or 20
print("Inspect the merged DynamicFrame that contains the combined rows")
merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 1| 0| fax| 202-225-3307|
| 1| 1| phone| 202-225-5731|
| 2| 0| fax| 202-225-3307|
| 2| 1| phone| 202-225-5731|
| 3| 0| fax| 202-225-3307|
| 3| 1| phone| 202-225-5731|
| 4| 0| fax| 202-225-3307|
| 4| 1| phone| 202-225-5731|
| 5| 0| fax| 202-225-3307|
| 5| 1| phone| 202-225-5731|
| 6| 0| fax| 202-225-3307|
| 6| 1| phone| 202-225-5731|
| 7| 0| fax| 202-225-3307|
| 7| 1| phone| 202-225-5731|
| 8| 0| fax| 202-225-3307|
| 8| 1| phone| 202-225-5731|
| 9| 0| fax| 202-225-3307|
| 9| 1| phone| 202-225-5731|
| 10| 0| fax| 202-225-6328|
| 10| 1| phone| 202-225-4576|
+---+-----+------------------------+-------------------------+
only showing top 20 rows
Inspect the DynamicFrame that contains rows where ID > 10
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 11| 0| fax| 202-225-6328|
| 11| 1| phone| 202-225-4576|
| 11| 2| twitter| RepTrentFranks|
| 12| 0| fax| 202-225-6328|
| 12| 1| phone| 202-225-4576|
| 12| 2| twitter| RepTrentFranks|
| 13| 0| fax| 202-225-6328|
| 13| 1| phone| 202-225-4576|
| 13| 2| twitter| RepTrentFranks|
| 14| 0| fax| 202-225-6328|
| 14| 1| phone| 202-225-4576|
| 14| 2| twitter| RepTrentFranks|
| 15| 0| fax| 202-225-6328|
| 15| 1| phone| 202-225-4576|
| 15| 2| twitter| RepTrentFranks|
| 16| 0| fax| 202-225-6328|
| 16| 1| phone| 202-225-4576|
| 16| 2| twitter| RepTrentFranks|
| 17| 0| fax| 202-225-6328|
| 17| 1| phone| 202-225-4576|
+---+-----+------------------------+-------------------------+
only showing top 20 rows
Inspect the merged DynamicFrame that contains the combined rows
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 1| 0| fax| 202-225-3307|
| 1| 1| phone| 202-225-5731|
| 20| 0| fax| 202-225-5604|
| 20| 1| phone| 202-225-6536|
| 20| 2| twitter| USRepLong|
+---+-----+------------------------+-------------------------+
関係付け
relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
DynamicFrame
をリレーショナルデータベースに適合する形式に変換します。DynamicFrame
の関係付けは、DynamoDB などの NoSQL 環境から MySQL などのリレーショナルデータベースにデータを移動する場合に特に便利です。
この変換は、ネストされた列をネスト解除し、配列の列をピボットすることでフレームのリストを生成します。フェーズのネスト解除時に生成された結合キーを使用して、ピボットされた配列の列をルートテーブルに結合できます。
root_table_name
- ルートテーブルの名前。staging_path
– このメソッドがピボットテーブルのパーティションを CSV 形式で保存する保存先のパス (オプション)。ピボットされたテーブルはこのパスから読み取ります。options
- オプションのパラメータのディクショナリ。-
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: relationalize を使用して、DynamicFrame
のネストされたスキーマをフラット化する
このコード例では、relationalize
メソッドを使用して、ネストされたスキーマをリレーショナルデータベースに適合する形式にフラット化します。
データセットの例
この例では、次のスキーマを持つ legislators_combined
と呼ばれる DynamicFrame
を使用しています。legislators_combined
には、relationalize
変換によってフラット化される、links
、images
、contact_details
などのネストされた複数のフィールドがあります。
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- start_date: string
|-- family_name: string
|-- id: string
|-- death_date: string
|-- end_date: string
コードの例
# Example: Use relationalize to flatten
# a nested schema into a format that fits
# into a relational database.
# Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location.
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Apply relationalize and inspect new tables
legislators_relationalized = legislators_combined.relationalize(
"l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir
"
)
legislators_relationalized.keys()
# Compare the schema of the contact_details
# nested field to the new relationalized table that
# represents it
legislators_combined.select_fields("contact_details").printSchema()
legislators_relationalized.select("l_root_contact_details").toDF().where(
"id = 10 or id = 75"
).orderBy(["id", "index"]).show()
次の出力では、contact_details
と呼ばれるネストされたフィールドのスキーマを、relationalize
変換によって作成されたテーブルと比較できます。テーブルのレコードが id
と呼ばれる外部キーと配列の位置を表す index
列を使用してメインテーブルにリンクされていることに注意してください。
dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers'])
root
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10| 0| fax| 202-225-4160|
| 10| 1| phone| 202-225-3436|
| 75| 0| fax| 202-225-6791|
| 75| 1| phone| 202-225-2861|
| 75| 2| twitter| RepSamFarr|
+---+-----+------------------------+-------------------------+
rename_field
rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
この DynamicFrame
のフィールドの名前を変更し、フィールドの名前が変更された新しい DynamicFrame
を返します。
-
oldName
- 名前を変更するノードへのフルパス。古い名前にドットが含まれている場合、
RenameField
はバックティック (`
) で囲まなければ機能しません。例えば、this.old.name
をthisNewName
に置き換えるには、rename_field を次のように呼び出します。newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
-
newName
- 完全パスとしての新しい名前。 -
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: rename_field を使用して、DynamicFrame
のフィールドの名前を変更する
このコード例では、rename_field
メソッドを使用して DynamicFrame
のフィールドの名前を変更します。この例では、メソッドの連鎖を使用して、複数のフィールドの名前を同時に変更していることに注意してください。
注記
この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。
コードの例
# Example: Use rename_field to rename fields
# in a DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Inspect the original orgs schema
orgs = glueContext.create_dynamic_frame.from_catalog(
database="legislators", table_name="organizations_json"
)
print("Original orgs schema: ")
orgs.printSchema()
# Rename fields and view the new schema
orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name")
print("New orgs schema with renamed fields: ")
orgs.printSchema()
Original orgs schema:
root
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- seats: int
|-- type: string
New orgs schema with renamed fields:
root
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- classification: string
|-- org_id: string
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- seats: int
|-- type: string
resolveChoice
resolveChoice(specs = None, choice = "" , database = None , table_name = None ,
transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id =
None)
この DynamicFrame
内で選択タイプを解決し、新しい DynamicFrame
を返します。
-
specs
– それぞれがタプルの形式である、解決すべき特定の曖昧要素のリスト:(field_path, action)
。resolveChoice
を使用するには 2 つの方法があります。最初の方法では、specs
引数により、特定のフィールドのシーケンスと解決方法を指定します。resolveChoice
のもう 1 つのモードでは、choice
引数を使用して、すべてのChoiceTypes
に対して単一の解決策を指定します。specs
の値は、(field_path, action)
ペアで構成されたタプルとして指定されます。field_path
値は特定のあいまいな要素を識別し、action
値は対応する解決を識別します。以下のアクションを指定できます。-
cast:
– すべての値について、指定した型へのキャストを試みます。例:type
cast:int
。 -
make_cols
– それぞれの異なるタイプを
という名前の列に変換します。データをフラット化することで潜在的なあいまいさを解消します。例えば、columnName
_type
columnA
がint
またはstring
の場合、解決策は、作成されたDynamicFrame
にcolumnA_int
およびcolumnA_string
という名前の 2 つの列を生成することです。 -
make_struct
–struct
を使用してデータを表現することで、潜在的なあいまいさを解決します。例えば、列のデータがint
またはstring
となる可能性がある場合、make_struct
アクションを使用すると、作成されたDynamicFrame
に構造体の列が生成されます。各構造体には、int
とstring
の両方が含まれています。 -
project:
– 可能なデータ型の 1 つにすべてのデータを投影することで、潜在的なあいまいさを解消します。例えば、列のデータがtype
int
またはstring
の場合、project:string
アクションを使用すると、すべてのint
値が文字列に変換されている、作成されたDynamicFrame
に列が生成されます。
field_path
で配列を識別する場合は、あいまいさを避けるために配列名の後に空の角括弧を置きます。例えば、使用しているデータが次のように構造化されているとします。"myList": [ { "price": 100.00 }, { "price": "$100.00" } ]
field_path
を"myList[].price"
に設定し、action
を"cast:double"
に設定すると、文字列バージョンではなく、数値バージョンの料金を選択できます。注記
specs
パラメータおよびchoice
パラメータのうち 1 つのみを使用できます。specs
パラメータがNone
ではない場合、choice
パラメータは空の文字列である必要があります。逆に、choice
が空の文字列ではない場合、specs
パラメータはNone
である必要があります。 -
choice
– すべてのChoiceTypes
について、単一の解決方法を指定します。このモードは、ランタイム前にChoiceTypes
の完全なリストが不明な場合に使用できます。この引数では、上記のspecs
用のアクションに加えて、以下のアクションもサポートされています。-
match_catalog
– 各ChoiceType
について、指定した Data Catalog テーブル内の対応する型へのキャストを試みます。
-
-
database
–match_catalog
アクションで使用する Data Catalog データベース。 -
table_name
–match_catalog
アクションで使用する Data Catalog テーブル。 -
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
– この変換までに発生したエラーのうち、プロセスでエラーを出力する必要があるエラーの数 (オプション)。デフォルトはゼロで、プロセスでエラーを出力しないことを示します。 -
catalog_id
– 現在アクセスされている Data Catalog のカタログ ID (Data Catalog のアカウント ID)。None
を設定した場合(デフォルト値) では、呼び出し元アカウントのカタログ ID が使用されます。
例: resolveChoice を使用して、複数の型を含む列を処理する
このコード例では、resolveChoice
メソッドを使用して、複数の型の値を含む DynamicFrame
列の処理方法を指定します。この例は、型の異なる列を処理する一般的な 2 つの方法を示しています。
列を単一のデータ型にキャストします。
すべての型を別々の列に保持します。
データセットの例
注記
この例で使用されているデータセットにアクセスするには、「コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。
この例では、次のスキーマを持つ medicare
と呼ばれる DynamicFrame
を使用しています。
root
|-- drg definition: string
|-- provider id: choice
| |-- long
| |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
コードの例
# Example: Use resolveChoice to handle
# a column that contains multiple types
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Load the input data and inspect the "provider id" column
medicare = glueContext.create_dynamic_frame.from_catalog(
database="payments", table_name="medicare_hospital_provider_csv"
)
print("Inspect the provider id column:")
medicare.toDF().select("provider id").show()
# Cast provider id to type long
medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")])
print("Schema after casting provider id to type long:")
medicare_resolved_long.printSchema()
medicare_resolved_long.toDF().select("provider id").show()
# Create separate columns
# for each provider id type
medicare_resolved_cols = medicare.resolveChoice(choice="make_cols")
print("Schema after creating separate columns for each type:")
medicare_resolved_cols.printSchema()
medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column:
+-----------+
|provider id|
+-----------+
| [10001,]|
| [10005,]|
| [10006,]|
| [10011,]|
| [10016,]|
| [10023,]|
| [10029,]|
| [10033,]|
| [10039,]|
| [10040,]|
| [10046,]|
| [10055,]|
| [10056,]|
| [10078,]|
| [10083,]|
| [10085,]|
| [10090,]|
| [10092,]|
| [10100,]|
| [10103,]|
+-----------+
only showing top 20 rows
Schema after casting 'provider id' to type long:
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
+-----------+
|provider id|
+-----------+
| 10001|
| 10005|
| 10006|
| 10011|
| 10016|
| 10023|
| 10029|
| 10033|
| 10039|
| 10040|
| 10046|
| 10055|
| 10056|
| 10078|
| 10083|
| 10085|
| 10090|
| 10092|
| 10100|
| 10103|
+-----------+
only showing top 20 rows
Schema after creating separate columns for each type:
root
|-- drg definition: string
|-- provider id_string: string
|-- provider id_long: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
+----------------+------------------+
|provider id_long|provider id_string|
+----------------+------------------+
| 10001| null|
| 10005| null|
| 10006| null|
| 10011| null|
| 10016| null|
| 10023| null|
| 10029| null|
| 10033| null|
| 10039| null|
| 10040| null|
| 10046| null|
| 10055| null|
| 10056| null|
| 10078| null|
| 10083| null|
| 10085| null|
| 10090| null|
| 10092| null|
| 10100| null|
| 10103| null|
+----------------+------------------+
only showing top 20 rows
select_fields
select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
選択したフィールドを含む新しい DynamicFrame
を返します。
-
paths
- 文字列のリスト。各文字列は、選択する最上位ノードへのパスです。 -
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: select_fields を使用して、選択したフィールドで新しい DynamicFrame
を作成する
次のコード例は、select_fields
メソッドを使用して、既存の DynamicFrame
から選択されたフィールドのリストを使用し、新しい DynamicFrame
を作成する方法を示しています。
注記
この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。
# Example: Use select_fields to select specific fields from a DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Create a DynamicFrame and view its schema
persons = glueContext.create_dynamic_frame.from_catalog(
database="legislators", table_name="persons_json"
)
print("Schema for the persons DynamicFrame:")
persons.printSchema()
# Create a new DynamicFrame with chosen fields
names = persons.select_fields(paths=["family_name", "given_name"])
print("Schema for the names DynamicFrame, created with select_fields:")
names.printSchema()
names.toDF().show()
Schema for the persons DynamicFrame:
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
Schema for the names DynamicFrame:
root
|-- family_name: string
|-- given_name: string
+-----------+----------+
|family_name|given_name|
+-----------+----------+
| Collins| Michael|
| Huizenga| Bill|
| Clawson| Curtis|
| Solomon| Gerald|
| Rigell| Edward|
| Crapo| Michael|
| Hutto| Earl|
| Ertel| Allen|
| Minish| Joseph|
| Andrews| Robert|
| Walden| Greg|
| Kazen| Abraham|
| Turner| Michael|
| Kolbe| James|
| Lowenthal| Alan|
| Capuano| Michael|
| Schrader| Kurt|
| Nadler| Jerrold|
| Graves| Tom|
| McMillan| John|
+-----------+----------+
only showing top 20 rows
simplify_ddb_json
simplify_ddb_json(): DynamicFrame
特に DynamoDB JSON 構造にある DynamicFrame
でネスト化された列を単純化し、新しい単純化された DynamicFrame
を返します。リストタイプに複数タイプまたは Map タイプがある場合、リストの要素は単純化されません。これは通常の unnest
変換とは異なって動作する変換の特定タイプであり、データが既に DynamoDB JSON 構造に格納されている必要があることに注意してください。詳細については、「DynamoDB JSON」を参照してください。
例えば、DynamoDB JSON 構造体のあるエクスポートを読み取るスキーマは次のようになります。
root
|-- Item: struct
| |-- parentMap: struct
| | |-- M: struct
| | | |-- childMap: struct
| | | | |-- M: struct
| | | | | |-- appName: struct
| | | | | | |-- S: string
| | | | | |-- packageName: struct
| | | | | | |-- S: string
| | | | | |-- updatedAt: struct
| | | | | | |-- N: string
| |-- strings: struct
| | |-- SS: array
| | | |-- element: string
| |-- numbers: struct
| | |-- NS: array
| | | |-- element: string
| |-- binaries: struct
| | |-- BS: array
| | | |-- element: string
| |-- isDDBJson: struct
| | |-- BOOL: boolean
| |-- nullValue: struct
| | |-- NULL: boolean
simplify_ddb_json()
変換は、以下のように変換します。
root
|-- parentMap: struct
| |-- childMap: struct
| | |-- appName: string
| | |-- packageName: string
| | |-- updatedAt: string
|-- strings: array
| |-- element: string
|-- numbers: array
| |-- element: string
|-- binaries: array
| |-- element: string
|-- isDDBJson: boolean
|-- nullValue: null
例: simplify_ddb_json を使用して DynamoDB JSON の単純化を呼び出す
このコード例では simplify_ddb_json
メソッドを使用し、AWS Glue DynamoDB エクスポートコネクタを使用し、DynamoDB JSON 単純化を呼び出してパーティションの数を表示します。
コードの例
from pyspark.context import SparkContext
from awsglue.context import GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
dynamicFrame = glueContext.create_dynamic_frame.from_options(
connection_type = "dynamodb",
connection_options = {
'dynamodb.export': 'ddb',
'dynamodb.tableArn': '<table arn>',
'dynamodb.s3.bucket': '<bucket name>',
'dynamodb.s3.prefix': '<bucket prefix>',
'dynamodb.s3.bucketOwner': '<account_id of bucket>'
}
)
simplified = dynamicFrame.simplify_ddb_json()
print(simplified.getNumPartitions())
スピゴット
spigot(path, options={})
ジョブで実行された変換が確認しやすくなるように、サンプルレコードを指定した送信先に書き込みます。
-
path
– 書き込み先へのパス (必須)。 -
options
– オプションを指定するキーと値のペア (オプション)。"topk"
オプションは、最初のk
レコードを書き込むことを指定します。"prob"
オプションは、指定されたレコードを選択する確率(10 進数)を指定します。これを使用して、書き込むレコードを選択できます。 transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。
例: spigot を使用して DynamicFrame
のサンプルフィールドを Amazon S3 に書き込む
このコード例では、spigot
メソッドを使用して、select_fields
変換を適用した後に Amazon S3 バケットにサンプルレコードを書き込みます。
データセットの例
注記
この例で使用されているデータセットにアクセスするには、「コード例: データの結合と関係付け」を参照し、「ステップ 1: Amazon S3 バケット内のデータをクロールする」の手順に従います。
この例では、次のスキーマを持つ persons
と呼ばれる DynamicFrame
を使用しています。
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
コードの例
# Example: Use spigot to write sample records
# to a destination during a transformation
# from pyspark.context import SparkContext.
# Replace DOC-EXAMPLE-BUCKET with your own location.
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Load table data into a DynamicFrame
persons = glueContext.create_dynamic_frame.from_catalog(
database="legislators", table_name="persons_json"
)
# Perform the select_fields on the DynamicFrame
persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"])
# Use spigot to write a sample of the transformed data
# (the first 10 records)
spigot_output = persons.spigot(
path="s3://DOC-EXAMPLE-BUCKET
", options={"topk": 10}
)
spigot
で Amazon S3 に書き込むデータの例を次に示します。サンプルコードで options={"topk": 10}
を指定したため、サンプルデータには最初の 10 個のレコードが含まれます。
{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"}
{"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"}
{"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"}
{"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"}
{"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"}
{"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"}
{"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"}
{"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"}
{"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"}
{"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}
split_fields
split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
2 つの DynamicFrames
を含む新しい DynamicFrameCollection
を返します。1 つ目の DynamicFrame
には分割されたすべてのノードが含まれ、2 つ目には残りのノードが含まれています。
-
paths
- 文字列のリスト。各文字列は新しいDynamicFrame
に分割するノードのフルパスです。 -
name1
- 分割されたDynamicFrame
の名前文字列。 -
name2
- 指定されたノードが分割された後に残るDynamicFrame
の名前文字列。 -
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: split_fields を使用して、選択したフィールドを別の DynamicFrame
に分割する
このコード例では、split_fields
メソッドを使用して、指定したフィールドのリストを別の DynamicFrame
に分割します。
データセットの例
この例では、legislators_relationalized
という名前のコレクションからの l_root_contact_details
と呼ばれる DynamicFrame
が使用されています。
l_root_contact_details
には、次のスキーマとエントリが含まれています。
root
|-- id: long
|-- index: int
|-- contact_details.val.type: string
|-- contact_details.val.value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 1| 0| phone| 202-225-5265|
| 1| 1| twitter| kathyhochul|
| 2| 0| phone| 202-225-3252|
| 2| 1| twitter| repjackyrosen|
| 3| 0| fax| 202-225-1314|
| 3| 1| phone| 202-225-3772|
...
コードの例
# Example: Use split_fields to split selected
# fields into a separate DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Load the input DynamicFrame and inspect its schema
frame_to_split = legislators_relationalized.select("l_root_contact_details")
print("Inspect the input DynamicFrame schema:")
frame_to_split.printSchema()
# Split id and index fields into a separate DynamicFrame
split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right")
# Inspect the resulting DynamicFrames
print("Inspect the schemas of the DynamicFrames created with split_fields:")
split_fields_collection.select("left").printSchema()
split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema:
root
|-- id: long
|-- index: int
|-- contact_details.val.type: string
|-- contact_details.val.value: string
Inspect the schemas of the DynamicFrames created with split_fields:
root
|-- id: long
|-- index: int
root
|-- contact_details.val.type: string
|-- contact_details.val.value: string
split_rows
split_rows(comparison_dict, name1, name2, transformation_ctx="", info="",
stageThreshold=0, totalThreshold=0)
DynamicFrame
の 1 つ以上の行を、新しい DynamicFrame
に分割します。
このメソッドは、2 つの DynamicFrames
を含む新しい DynamicFrameCollection
を返します。1 つ目の DynamicFrame
には分割されたすべての行が含まれ、2 つ目には残りの行が含まれています。
-
comparison_dict
– 列へのパスを示すためのキーと、列の値の比較対象である値に対しコンパレータをマッピングする別のディクショナリを示すための値を持つ、ディクショナリ。例えば、{"age": {">": 10, "<": 20}}
は、age 列の値が 10 より大きく 20 より小さいすべての行を分割します。 -
name1
- 分割されたDynamicFrame
の名前文字列。 -
name2
- 指定されたノードが分割された後に残るDynamicFrame
の名前文字列。 -
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: split_rows を使用して、DynamicFrame
内の行を分割する
このコード例では、split_rows
メソッドを使用して、id
フィールド値に基づいて DynamicFrame
の行を分割します。
データセットの例
この例では、legislators_relationalized
という名前のコレクションから選択された l_root_contact_details
と呼ばれる DynamicFrame
が使用されています。
l_root_contact_details
には、次のスキーマとエントリが含まれています。
root
|-- id: long
|-- index: int
|-- contact_details.val.type: string
|-- contact_details.val.value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 1| 0| phone| 202-225-5265|
| 1| 1| twitter| kathyhochul|
| 2| 0| phone| 202-225-3252|
| 2| 1| twitter| repjackyrosen|
| 3| 0| fax| 202-225-1314|
| 3| 1| phone| 202-225-3772|
| 3| 2| twitter| MikeRossUpdates|
| 4| 0| fax| 202-225-1314|
| 4| 1| phone| 202-225-3772|
| 4| 2| twitter| MikeRossUpdates|
| 5| 0| fax| 202-225-1314|
| 5| 1| phone| 202-225-3772|
| 5| 2| twitter| MikeRossUpdates|
| 6| 0| fax| 202-225-1314|
| 6| 1| phone| 202-225-3772|
| 6| 2| twitter| MikeRossUpdates|
| 7| 0| fax| 202-225-1314|
| 7| 1| phone| 202-225-3772|
| 7| 2| twitter| MikeRossUpdates|
| 8| 0| fax| 202-225-1314|
+---+-----+------------------------+-------------------------+
コードの例
# Example: Use split_rows to split up
# rows in a DynamicFrame based on value
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Retrieve the DynamicFrame to split
frame_to_split = legislators_relationalized.select("l_root_contact_details")
# Split up rows by ID
split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low")
# Inspect the resulting DynamicFrames
print("Inspect the DynamicFrame that contains IDs < 10")
split_rows_collection.select("low").toDF().show()
print("Inspect the DynamicFrame that contains IDs > 10")
split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 1| 0| phone| 202-225-5265|
| 1| 1| twitter| kathyhochul|
| 2| 0| phone| 202-225-3252|
| 2| 1| twitter| repjackyrosen|
| 3| 0| fax| 202-225-1314|
| 3| 1| phone| 202-225-3772|
| 3| 2| twitter| MikeRossUpdates|
| 4| 0| fax| 202-225-1314|
| 4| 1| phone| 202-225-3772|
| 4| 2| twitter| MikeRossUpdates|
| 5| 0| fax| 202-225-1314|
| 5| 1| phone| 202-225-3772|
| 5| 2| twitter| MikeRossUpdates|
| 6| 0| fax| 202-225-1314|
| 6| 1| phone| 202-225-3772|
| 6| 2| twitter| MikeRossUpdates|
| 7| 0| fax| 202-225-1314|
| 7| 1| phone| 202-225-3772|
| 7| 2| twitter| MikeRossUpdates|
| 8| 0| fax| 202-225-1314|
+---+-----+------------------------+-------------------------+
only showing top 20 rows
Inspect the DynamicFrame that contains IDs > 10
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 11| 0| phone| 202-225-5476|
| 11| 1| twitter| RepDavidYoung|
| 12| 0| phone| 202-225-4035|
| 12| 1| twitter| RepStephMurphy|
| 13| 0| fax| 202-226-0774|
| 13| 1| phone| 202-225-6335|
| 14| 0| fax| 202-226-0774|
| 14| 1| phone| 202-225-6335|
| 15| 0| fax| 202-226-0774|
| 15| 1| phone| 202-225-6335|
| 16| 0| fax| 202-226-0774|
| 16| 1| phone| 202-225-6335|
| 17| 0| fax| 202-226-0774|
| 17| 1| phone| 202-225-6335|
| 18| 0| fax| 202-226-0774|
| 18| 1| phone| 202-225-6335|
| 19| 0| fax| 202-226-0774|
| 19| 1| phone| 202-225-6335|
| 20| 0| fax| 202-226-0774|
| 20| 1| phone| 202-225-6335|
+---+-----+------------------------+-------------------------+
only showing top 20 rows
アンボックス
unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)
DynamicFrame
の文字列フィールドをアンボックス (再フォーマット) し、アンボックスされた DynamicRecords
を含む新しい DynamicFrame
を返します。
DynamicRecord
は DynamicFrame
内の論理レコードを表します。これは、自己記述型であり、固定スキーマに準拠しないデータに使用できる点を除いて、Apache Spark DataFrame
の行に似ています。
-
path
- アンボックスする文字列ノードへのフルパス。 format
– 形式の仕様 (オプション)。Amazon S3 や、複数の形式をサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。-
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
options
- 次の 1 つ以上。separator
– 区切り文字を含む文字列。escaper
– エスケープ文字を含む文字列。skipFirst
– 最初のインスタンスをスキップするかどうかを示すブール値。-
withSchema
— ノードのスキーマの JSON 表現を含む文字列。スキーマの JSON 表現の形式は、StructType.json()
の出力によって定義されます。 withHeader
– ヘッダーが含まれているかどうかを示すブール値。
例: unbox を使用して、文字列フィールドを構造体にアンボックスする
このコード例では、unbox
メソッドを使用して、DynamicFrame
の文字列フィールドを struct 型のフィールドにアンボックスまたは再フォーマットします。
データセットの例
この例では、次のスキーマとエントリを持つ mapped_with_string
と呼ばれる DynamicFrame
を使用しています。
AddressString
という名前のフィールドに注目してください。この例では、このフィールドを構造体にアンボックスしています。
root
|-- Average Total Payments: string
|-- AddressString: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address: struct
| |-- Zip.Code: string
| |-- City: string
| |-- Array: array
| | |-- element: string
| |-- State: string
| |-- Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string
+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
|Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name|
+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
| $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...|
| $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...|
| $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...|
| $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST|
...
コードの例
# Example: Use unbox to unbox a string field
# into a struct in a DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
unboxed = mapped_with_string.unbox("AddressString", "json")
unboxed.printSchema()
unboxed.toDF().show()
root
|-- Average Total Payments: string
|-- AddressString: struct
| |-- Street: string
| |-- City: string
| |-- State: string
| |-- Zip.Code: string
| |-- Array: array
| | |-- element: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address: struct
| |-- Zip.Code: string
| |-- City: string
| |-- Array: array
| | |-- element: string
| |-- State: string
| |-- Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string
+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
|Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name|
+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
| $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...|
| $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...|
| $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...|
| $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST|
| $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...|
| $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...|
| $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...|
| $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...|
| $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL|
| $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...|
| $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...|
| $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL|
| $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...|
| $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...|
| $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...|
| $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...|
| $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL|
| $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...|
| $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL|
| $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...|
+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
only showing top 20 rows
union
union(frame1, frame2, transformation_ctx = "",
info = "", stageThreshold = 0, totalThreshold = 0)
2 つの DynamicFrames を結合します。両方の入力 DynamicFrame からのすべてのレコードを含む DynamicFrames を返します。この変換は、2 つの DataFrames rame を同等のデータで結合した結果とは異なる結果を返す場合があります。Spark DataFrame ユニオンの動作が必要な場合は、toDF
の使用を検討してください。
-
frame1
— 最初に結合する DynamicFrame。 -
frame2
— 2 番目に結合する DynamicFrame。 -
transformation_ctx
– (オプション) 統計/ステータス情報を識別するために使用される一意の文字列。 -
info
– (オプション) 変換のエラーに関連付けられる文字列。 -
stageThreshold
— (オプション) 処理がエラーになるまでの変換中の最大エラー数。 -
totalThreshold
— (オプション) 処理がエラーになるまでの変換中の最大エラー数。
ネスト解除
unnest(transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
DynamicFrame
内のネストされたオブジェクトをネスト解除して、最上位レベルのオブジェクトにし、新しくネスト解除された DynamicFrame
を返します。
-
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中に発生した、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスがエラーを出力するエラーの数 (オプション)。デフォルトはゼロで、エラーを出力しないことを示します。
例: unnest を使用して、ネストされたフィールドを最上位フィールドに変換する
このコード例では、unnest
メソッドを使用して、DynamicFrame
のネストされたすべてのフィールドを最上位フィールドにフラット化します。
データセットの例
この例では、次のスキーマを持つ mapped_medicare
と呼ばれる DynamicFrame
を使用しています。Address
フィールドは、ネストされたデータを含む唯一のフィールドであることに注意してください。
root
|-- Average Total Payments: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address: struct
| |-- Zip.Code: string
| |-- City: string
| |-- Array: array
| | |-- element: string
| |-- State: string
| |-- Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string
コードの例
# Example: Use unnest to unnest nested
# objects in a DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Unnest all nested fields
unnested = mapped_medicare.unnest()
unnested.printSchema()
root
|-- Average Total Payments: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address.Zip.Code: string
|-- Address.City: string
|-- Address.Array: array
| |-- element: string
|-- Address.State: string
|-- Address.Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string
unnest_ddb_json
特に DynamoDB JSON 構造体にある DynamicFrame
でネストされた列をネスト解除すると、新しくネスト解除された DynamicFrame
が返されます。構造体型の配列のある列は、ネスト解除されません。これは、通常の unnest
変換とは異なる特殊なタイプのネスト解除変換で、データが DynamoDB JSON 構造体に格納されている必要があることに注意してください。詳細については、「DynamoDB JSON」を参照してください。
unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
-
transformation_ctx
- 状態情報を識別するために使用される一意の文字列 (オプション)。 -
info
- この変換のエラー報告に関連付ける文字列 (オプション)。 -
stageThreshold
- この変換中にプロセスで発生するエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。 -
totalThreshold
- この変換までに発生したエラーのうち、プロセスでエラーとなるエラーの数 (オプション: デフォルトではゼロ、プロセスがエラーを出力しないことを示します)。
例えば、DynamoDB JSON 構造体のあるエクスポートを読み取るスキーマは次のようになります。
root
|-- Item: struct
| |-- ColA: struct
| | |-- S: string
| |-- ColB: struct
| | |-- S: string
| |-- ColC: struct
| | |-- N: string
| |-- ColD: struct
| | |-- L: array
| | | |-- element: null
unnest_ddb_json()
変換は、以下のように変換します。
root
|-- ColA: string
|-- ColB: string
|-- ColC: string
|-- ColD: array
| |-- element: null
次のコード例は、AWS Glue DynamoDB エクスポートコネクタを使用し、DynamoDB JSON unnest を呼び出し、パーティションの数を表示します。
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glue_context= GlueContext(SparkContext.getOrCreate())
job = Job(glue_context)
job.init(args["JOB_NAME"], args)
dynamicFrame = glue_context.create_dynamic_frame.from_options(
connection_type="dynamodb",
connection_options={
"dynamodb.export": "ddb",
"dynamodb.tableArn": "<test_source>",
"dynamodb.s3.bucket": "<bucket name>",
"dynamodb.s3.prefix": "<bucket prefix>",
"dynamodb.s3.bucketOwner": "<account_id>",
}
)
unnested = dynamicFrame.unnest_ddb_json()
print(unnested.getNumPartitions())
job.commit()
書き込み
write(connection_type, connection_options, format, format_options, accumulator_size)
この DynamicFrame
の GlueContext クラス から指定された接続タイプの DataSink (オブジェクト) を取得し、この DynamicFrame
のコンテンツの書式設定および書き込みに使用します。指定されたとおりに書式設定され、書き込まれる新しい DynamicFrame
を返します。
-
connection_type
- 使用する接続タイプ。有効な値には、s3
、mysql
、postgresql
、redshift
、sqlserver
、およびoracle
があります。 -
connection_options
- 使用する接続オプション (オプション)。s3
のconnection_type
では、Amazon S3 パスが定義されています。connection_options = {"path": "
s3://aws-glue-target/temp
"}JDBC 接続の場合、いくつかのプロパティを定義する必要があります。データベース名は URL の一部である必要があることに注意してください。オプションで接続オプションに含めることができます。
警告
スクリプトにパスワードを保存することはお勧めしません。AWS Secrets Manager または AWS Glue データカタログから取得する場合には、
boto3
を使用することを検討してください。connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"} format
– 形式の仕様 (オプション)。これは、Amazon Simple Storage Service (Amazon S3)、または複数の形式をサポートする AWS Glue 接続で使用されます。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。format_options
– 指定された形式についてのオプション。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。accumulator_size
- 使用するバイト単位の累積サイズ (オプション)。
— errors —
assertErrorThreshold
assertErrorThreshold( )
- この DynamicFrame
を作成した変換エラーに対するアサーション。基盤になる DataFrame
から Exception
を返します。
errorsAsDynamicFrame
errorsAsDynamicFrame( )
- 内部にネストされたエラーレコードを持つ DynamicFrame
を返します。
例: errorsAsDynamicFrame を使用してエラーレコードを表示する
次のコード例は、errorsAsDynamicFrame
メソッドを使用して DynamicFrame
のエラーレコードを表示する方法を示しています。
データセットの例
この例では、JSON として Amazon S3 にアップロードできる次のデータセットを使用します。2 つ目のレコードの形式に誤りがあることに注意してください。通常、SparkSQL を使用すると、不正な形式のデータによってファイルの解析が中断されます。ただし、DynamicFrame
は、不正な形式の問題を認識し、不正な行を個別に処理できるエラーレコードに変換します。
{"id": 1, "name": "george", "surname": "washington", "height": 178}
{"id": 2, "name": "benjamin", "surname": "franklin",
{"id": 3, "name": "alexander", "surname": "hamilton", "height": 171}
{"id": 4, "name": "john", "surname": "jay", "height": 190}
コードの例
# Example: Use errorsAsDynamicFrame to view error records.
# Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location.
from pyspark.context import SparkContext
from awsglue.context import GlueContext
# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Create errors DynamicFrame, view schema
errors = glueContext.create_dynamic_frame.from_options(
"s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json
"]}, "json"
)
print("Schema of errors DynamicFrame:")
errors.printSchema()
# Show that errors only contains valid entries from the dataset
print("errors contains only valid records from the input dataset (2 of 4 records)")
errors.toDF().show()
# View errors
print("Errors count:", str(errors.errorsCount()))
print("Errors:")
errors.errorsAsDynamicFrame().toDF().show()
# View error fields and error data
error_record = errors.errorsAsDynamicFrame().toDF().head()
error_fields = error_record["error"]
print("Error fields: ")
print(error_fields.asDict().keys())
print("\nError record data:")
for key in error_fields.asDict().keys():
print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame:
root
|-- id: int
|-- name: string
|-- surname: string
|-- height: int
errors contains only valid records from the input dataset (2 of 4 records)
+---+------+----------+------+
| id| name| surname|height|
+---+------+----------+------+
| 1|george|washington| 178|
| 4| john| jay| 190|
+---+------+----------+------+
Errors count: 1
Errors:
+--------------------+
| error|
+--------------------+
|[[ File "/tmp/20...|
+--------------------+
Error fields:
dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord'])
Error record data:
callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='')
msg : error in jackson reader
stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name
at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740)
at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57)
at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57)
at scala.collection.Iterator$$anon$9.next(Iterator.scala:162)
at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599)
at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120)
at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116)
at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209)
at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202)
at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116)
at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109)
at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247)
at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
input :
bytesread : 252
source :
dynamicRecord : Row(id=2, name='benjamin', surname='franklin')
errorsCount
errorsCount( )
- DynamicFrame
内のエラーの総数を返します。
stageErrorsCount
stageErrorsCount
- この DynamicFrame
を生成するプロセスで発生したエラーの数を返します。