Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
DynamicFrame Klasse
Eine der wichtigsten Abstraktionen in Apache Spark ist der Spark SQLDataFrame
, der dem DataFrame
Konstrukt in R und Pandas ähnelt. A DataFrame
ähnelt einer Tabelle und unterstützt Operationen und Operationen im Funktionsstil (Zuordnen/Reduzieren/Filtern usw.) und Operationen (Auswählen, Projektieren, Aggregieren). SQL
DataFrames
sind leistungsstark und weit verbreitet, haben jedoch Einschränkungen in Bezug auf Extraktions-, Transformations- und Load () -Operationen. ETL Vor allem benötigen sie ein Schema, das angegeben werden muss, bevor Daten geladen werden. Spark SQL löst dieses Problem, indem es die Daten zweimal durchläuft — den ersten, um das Schema abzuleiten, und den zweiten, um die Daten zu laden. Diese Ableitung ist jedoch begrenzt und geht nicht die Gegebenheiten unübersichtlicher Daten an. Beispielsweise kann dasselbe Feld in verschiedenen Datensätzen unterschiedliche Typen aufweisen. Apache Spark gibt oft auf und meldet den Typ mithilfe des ursprünglichen Feldtexts als string
. Dies ist möglicherweise nicht korrekt, und Sie wünschen sich eine bessere Kontrolle darüber, wie Schemaabweichungen gelöst werden. Und ein zusätzlicher Durchgang für die Quelldaten kann für große Datensätze unerschwinglich teuer sein.
Um diese Einschränkungen zu beheben, führt AWS Glue die einDynamicFrame
. Ein DynamicFrame
ähnelt einem DataFrame
mit der Ausnahme, dass jeder Datensatz selbstbeschreibend ist, sodass zunächst kein Schema erforderlich ist. Stattdessen AWS Glue berechnet on-the-fly bei Bedarf ein Schema und codiert Schemainkonsistenzen explizit mithilfe eines Auswahltyps (oder Unionstyps). Sie können diese Inkonsistenzen lösen, um Ihre Datensätze mit Datenspeichern kompatibel zu machen, die ein festes Schema erfordern.
Gleichermaßen stellt ein DynamicRecord
einen logischen Datensatz innerhalb einem DynamicFrame
dar. Dies ist mit einer Zeile in einem Spark DataFrame
vergleichbar, mit der Ausnahme, dass er selbstbeschreibend ist und für Daten verwendet werden kann, die keinem festen Schema entsprechen. Wenn Sie AWS Glue mit verwenden PySpark, manipulieren Sie normalerweise nicht unabhängigDynamicRecords
. Stattdessen transformieren Sie den Datensatz gemeinsam über seinen DynamicFrame
.
Sie können DynamicFrames
zu und aus DataFrames
konvertieren, sobald Sie alle Schemainkonsistenzen gelöst haben.
– Konstruktion –
__init__
__init__(jdf, glue_ctx, name)
-
jdf
— Ein Verweis auf den Datenrahmen in der Java Virtual Machine (JVM). -
glue_ctx
– Ein GlueContext Klasse-Objekt. -
name
– Eine optionale Namenszeichenfolge, standardmäßig leer.
fromDF
fromDF(dataframe, glue_ctx, name)
Wandelt einen DataFrame
in einen DynamicFrame
um, indem DataFrame
-Felder in DynamicRecord
-Felder konvertiert werden. Gibt den neuen DynamicFrame
zurück.
Ein DynamicRecord
stellt einen logischen Datensatz in einem DynamicFrame
dar. Er ist mit einer Zeile in einem Spark DataFrame
vergleichbar, mit der Ausnahme, dass er selbstbeschreibend ist und für Daten verwendet werden kann, die keinem festen Schema entsprechen.
Diese Funktion geht davon aus, dass Spalten mit doppeltem Namen in Ihrem DataFrame
bereits umbenannt wurden.
-
dataframe
— Der SQLDataFrame
zu konvertierende Apache Spark (erforderlich). -
glue_ctx
– Das GlueContext Klasse-Objekt, das den Kontext für diese Transformation angibt (erforderlich). -
name
— Der Name des ErgebnissesDynamicFrame
(optional seit AWS Glue 3.0).
toDF
toDF(options)
Wandelt einen DynamicFrame
in einen Apache Spark DataFrame
um, indem DynamicRecords
in DataFrame
-Felder konvertiert werden. Gibt den neuen DataFrame
zurück.
Ein DynamicRecord
stellt einen logischen Datensatz in einem DynamicFrame
dar. Er ist mit einer Zeile in einem Spark DataFrame
vergleichbar, mit der Ausnahme, dass er selbstbeschreibend ist und für Daten verwendet werden kann, die keinem festen Schema entsprechen.
-
options
– Eine Liste der Optionen. Ermöglicht es Ihnen, zusätzliche Optionen für den Konvertierungsprozess anzugeben. Einige gültige Optionen, die Sie mit dem Parameter `options` verwenden können:-
format
— gibt das Format der Daten an, z. B. json, csv, parquet). -
separater or sep
— gibt für CSV Dateien das Trennzeichen an. -
header
— gibt bei CSV Dateien an, ob es sich bei der ersten Zeile um eine Kopfzeile handelt (true/false). -
inferSchema
— weist Spark an, das Schema automatisch abzuleiten (true/false).
Hier ist ein Beispiel für die Verwendung des Parameters `options` mit der `ToDF`-Methode:
from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) csv_dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://my-bucket/path/to/csv/"]}, format="csv" ) csv_cf = csv_dyf.toDF(options={ "separator": ",", "header": "true", "ïnferSchema": "true" })
Geben Sie den Zieltyp an, wenn Sie die Aktionstypen
Project
undCast
auswählen. Beispiele sind unter anderem:>>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])
-
– Informationen –
count
count( )
– Gibt die Anzahl von Zeilen in dem zugrunde liegenden DataFrame
zurück.
schema
schema( )
– Gibt das Schema dieses DynamicFrame
zurück oder wenn dieser nicht verfügbar ist, das Schema des zugrunde liegenden DataFrame
.
Weitere Informationen über die DynamicFrame
-Typen, aus denen dieses Schema besteht, finden Sie unter PySpark-Erweiterungstypen.
printSchema
printSchema( )
– Druckt das Schema des zugrunde liegenden DataFrame
.
show
show(num_rows)
– Druckt eine bestimmte Anzahl von Zeilen aus dem zugrunde liegenden DataFrame
.
repartition
repartition(numPartitions)
– Gibt einen neuen DynamicFrame
mit numPartitions
-Partitionen zurück.
COALESCE
coalesce(numPartitions)
– Gibt einen neuen DynamicFrame
mit numPartitions
-Partitionen zurück.
– Transformationen –
apply_mapping
apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Wendet ein deklaratives Mapping für einen DynamicFrame
an und gibt einen neuen DynamicFrame
mit diesen angewendeten Mappings auf die von Ihnen angegebenen Felder zurück. Nicht spezifizierte Felder werden in den neuen DynamicFrame
weggelassen.
-
mappings
– Eine Liste von Mapping-Tupeln (erforderlich). Jede besteht aus (Quellspalte, Quelltyp, Zielspalte, Zieltyp).Wenn die Quellspalte einen Punkt “
.
„ im Namen hat, müssen Sie Backticks “``
„ darum herum platzieren. Um beispielsweisethis.old.name
(Zeichenfolge) aufthisNewName
abzubilden, würden Sie das folgende Tupel verwenden:("`this.old.name`", "string", "thisNewName", "string")
-
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden Sie apply_mapping, um Felder umzubenennen und Feldtypen zu ändern
Im folgenden Beispielcode wird gezeigt, wie Sie die apply_mapping
-Methode verwenden, um ausgewählte Felder umzubenennen und Feldtypen zu ändern.
Anmerkung
Informationen zum Zugriff auf den Datensatz, der in diesem Beispiel verwendet wird, finden Sie unter Codebeispiel: Verknüpfen und Inbeziehungsetzen von Daten und folgen Sie den Anweisungen in Schritt 1: Crawlen der Daten im Amazon S3 Bucket.
# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date
drop_fields
drop_fields(paths, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Ruft die FlatMap Class-Transformation auf, um Felder aus einem DynamicFrame
zu entfernen. Gibt einen neuen DynamicFrame
zurück, in dem die angegebenen Felder gelöscht wurden.
-
paths
– Eine Liste von Zeichenfolgen. Jeder enthält den vollständigen Pfad zu einem Feldknoten, den Sie löschen möchten. Sie können die Punktnotation verwenden, um verschachtelte Felder anzugeben. Wenn zum Beispiel Feldfirst
ein untergeordnetes Feld des Feldesname
im Baum ist, spezifizieren Sie"name.first"
für den Pfad.Wenn der Name eines Feldknotens ein Literal
.
enthält, müssen Sie den Namen in Backticks (`
) einschließen. -
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden Sie drop_fields, um Felder aus einem DynamicFrame
zu entfernen
Dieses Codebeispiel verwendet die drop_fields
-Methode zum Entfernen ausgewählter Felder der obersten Ebene und verschachtelter Felder aus einem DynamicFrame
.
Beispieldatensatz
Das Beispiel verwendet den folgenden Datensatz, der durch die EXAMPLE-FRIENDS-DATA
-Tabelle im Code dargestellt wird:
{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}
Beispiel-Code
# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "
MY-EXAMPLE-DATABASE
" glue_source_table = "EXAMPLE-FRIENDS-DATA
" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string
Filter
filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Gibt einen neuen DynamicFrame
zurück, der alle DynamicRecords
innerhalb des Eingabe-DynamicFrame
enthält, die die angegebene Prädikat-Funktion f
erfüllen.
-
f
– Die Prädikat-Funktion, die auf denDynamicFrame
angewendet werden soll. Die Funktion muss einenDynamicRecord
als Argument enthalten und "True" zurückgeben, wenn derDynamicRecord
die Filteranforderungen erfüllt, oder andernfalls "False" (erforderlich).Ein
DynamicRecord
stellt einen logischen Datensatz in einemDynamicFrame
dar. Er ist mit einer Zeile in einem SparkDataFrame
vergleichbar, mit der Ausnahme, dass er selbstbeschreibend ist und für Daten verwendet werden kann, die keinem festen Schema entsprechen. -
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden Sie Filter, um eine gefilterte Auswahl von Feldern zu erhalten
In diesem Beispiel wird verwendet die filter
-Methode zum Erstellen eines neuen DynamicFrame
verwendet, das eine gefilterte Auswahl eines anderen DynamicFrame
-Feldes beinhaltet.
Wie die map
-Methode nimmt filter
eine Funktion als Argument, das auf jeden Datensatz im Original-DynamicFrame
angewendet wird. Die Funktion nimmt einen Datensatz als Eingabe und gibt einen booleschen Wert zurück. Wenn der Rückgabewert true ist, wird der Datensatz in das Ergebnis von DynamicFrame
aufgenommen. Wenn es false ist, wird der Datensatz weggelassen.
Anmerkung
Informationen zum Zugriff auf den Datensatz, der in diesem Beispiel verwendet wird, finden Sie unter Codebeispiel: Datenvorbereitung mit ResolveChoice, Lambda und ApplyMapping und folgen Sie den Anweisungen in Schritt 1: Crawlen der Daten im Amazon S3 Bucket.
# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564
join
join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Führt einen Equality Join mit einem anderen DynamicFrame
durch und gibt den sich ergebenden DynamicFrame
zurück.
-
paths1
– Eine Liste der Schlüssel in diesem Frame für einen Join. -
paths2
– Eine Liste der Schlüssel in dem anderen Frame für einen Join. -
frame2
– Der andereDynamicFrame
für einen Join. -
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden von join zum Kombinieren von DynamicFrames
In diesem Beispiel wird die join
Methode verwendet, um eine Verknüpfung von drei Objekten durchzuführen. DynamicFrames
AWS Glue führt die Verbindung auf der Grundlage der von Ihnen angegebenen Feldtasten durch. Der daraus resultierende DynamicFrame
enthält Zeilen aus den beiden Originalframes, in denen die angegebenen Schlüssel übereinstimmen.
Beachten Sie, dass join
transform alle Felder intakt hält. Das bedeutet, dass die Felder, die Sie für den Abgleich angeben DynamicFrame, im Ergebnis angezeigt werden, auch wenn sie redundant sind und dieselben Schlüssel enthalten. In diesem Beispiel verwenden wir drop_fields
, um diese redundanten Schlüssel nach dem Join zu entfernen.
Anmerkung
Informationen zum Zugriff auf den Datensatz, der in diesem Beispiel verwendet wird, finden Sie unter Codebeispiel: Verknüpfen und Inbeziehungsetzen von Daten und folgen Sie den Anweisungen in Schritt 1: Crawlen der Daten im Amazon S3 Bucket.
# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
map
map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Gibt einen neuen DynamicFrame
zurück, der sich durch Anwendung von Zuweisungsfunktionen auf alle Datensätze im ursprünglichen DynamicFrame
ergibt.
-
f
– Die Zuweisungsfunktion, die auf alle Datensätze imDynamicFrame
angewendet werden soll. Die Funktion muss einenDynamicRecord
als Argument enthalten und gibt einen neuenDynamicRecord
zurück (erforderlich).Ein
DynamicRecord
stellt einen logischen Datensatz in einemDynamicFrame
dar. Er ist mit einer Zeile in einem Apache SparkDataFrame
vergleichbar, mit der Ausnahme, dass er selbstbeschreibend ist und für Daten verwendet werden kann, die keinem festen Schema entsprechen. transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional).info
– Eine Zeichenfolge, die mit Fehlern in der Transformation im Zusammenhang steht (optional).stageThreshold
– Die maximale Anzahl von Fehlern, die in der Transformation auftreten dürfen, bevor der Vorgang abgebrochen wird (optional). Der Standardwert ist „Null“.totalThreshold
– Die maximale Anzahl von Fehlern, die insgesamt auftreten dürfen, bevor die Verarbeitung abgebrochen wird (optional). Der Standardwert ist „Null“.
Beispiel: Verwenden von map zur Anwendung einer Funktion auf jeden Datensatz in einem DynamicFrame
In diesem Beispiel wird gezeigt, wie Sie die map
-Methode zum Anwenden einer Funktion auf jeden Datensatz eines DynamicFrame
anwenden. Insbesondere wendet dieses Beispiel eine Funktion namens MergeAddress
auf jeden Datensatz an, um mehrere Adressfelder zu einem einzigen struct
-Typ zusammenzuführen.
Anmerkung
Informationen zum Zugriff auf den Datensatz, der in diesem Beispiel verwendet wird, finden Sie unter Codebeispiel: Datenvorbereitung mit ResolveChoice, Lambda und ApplyMapping und folgen Sie den Anweisungen in Schritt 1: Crawlen der Daten im Amazon S3 Bucket.
# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
mergeDynamicFrame
mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "",
options = {}, info = "", stageThreshold = 0, totalThreshold = 0)
Führt dieses DynamicFrame
mit einem Staging-DynamicFrame
basierend auf den angegebenen Primärschlüsseln zusammen, um Datensätze zu identifizieren. Doppelte Datensätze (Datensätze mit denselben Primärschlüsseln) werden nicht dedupliziert. Wenn kein übereinstimmender Datensatz im Staging-Frame vorhanden ist, werden alle Datensätze (einschließlich Duplikate) von der Quelle beibehalten. Wenn der Staging-Frame übereinstimmende Datensätze enthält, überschreiben die Datensätze aus dem Staging-Frame die Datensätze in der Quelle AWS Glue.
-
stage_dynamic_frame
– Der Staging-DynamicFrame
, der zusammengeführt werden soll. -
primary_keys
– Die Liste der Primärschlüsselfelder, die Datensätze aus den Quell- und dynamischen Staging-Frames abgleichen. -
transformation_ctx
– Eine eindeutige Zeichenfolge, die zum Abrufen von Metadaten über die aktuelle Transformation verwendet wird (optional). -
options
— Eine Zeichenfolge von JSON Name-Wert-Paaren, die zusätzliche Informationen für diese Transformation bereitstellen. Dieses Argument wird derzeit nicht verwendet. -
info
– EinString
. Jede Zeichenfolge, die mit Fehlern in dieser Transformation verknüpft werden soll. -
stageThreshold
– EinLong
. Die Anzahl der Fehler in der angegebenen Transformation, für die die Verarbeitung fehlerhaft sein muss. -
totalThreshold
– EinLong
. Die Gesamtzahl der Fehler bis einschließlich dieser Transformation, bei denen die Verarbeitung fehlerhaft sein muss.
Diese Methode gibt einen neuen DynamicFrame
zurück, der erhalten wird, indem Sie diesen DynamicFrame
mit dem Staging-DynamicFrame
zusammenführen.
Der zurückgegebene DynamicFrame
enthält Datensatz A in folgenden Fällen:
-
Wenn sowohl im Quell- als auch im Staging-Frame
A
vorhanden ist, wirdA
im Staging-Frame zurückgegeben. -
Wenn sich
A
in der Quelltabelle undA.primaryKeys
nicht instagingDynamicFrame
befindet, wirdA
nicht in der Staging-Tabelle aktualisiert.
Der Quell- und der Staging-Frame müssen nicht dasselbe Schema haben.
Beispiel: Wird verwendet mergeDynamicFrame , um zwei auf der DynamicFrames
Grundlage eines Primärschlüssels zusammenzuführen
Das folgende Codebeispiel zeigt, wie die mergeDynamicFrame
-Methode verwendet wird, um einen DynamicFrame
mit einem „Staging“-DynamicFrame
, basierend auf dem Primärschlüssel id
, zusammenzuführen.
Beispieldatensatz
Das Beispiel verwendet zwei DynamicFrames
von einem DynamicFrameCollection
mit dem Namen split_rows_collection
. Die folgende Liste enthält die Schlüssel für split_rows_collection
.
dict_keys(['high', 'low'])
Beispiel-Code
# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+
relationalize
relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Konvertiert eine DynamicFrame
in ein Formular, das in eine relationale Datenbank passt. Die Relationalisierung von a DynamicFrame
ist besonders nützlich, wenn Sie Daten aus einer SQL Nein-Umgebung wie DynamoDB in eine relationale Datenbank wie My verschieben möchten. SQL
Die Transformation generiert eine Liste von Frames, indem verschachtelte Spalten aufgehoben und Array-Spalten pivotiert werden. Sie können die pivotierten Array-Spalten mithilfe des Join-Schlüssels, der während der Auflösung der Verschachtelung generiert wird, mit der Stammtabelle verbinden.
root_table_name
– Der Name für die Stammtabelle.staging_path
— Der Pfad, in dem die Methode Partitionen von pivotierten Tabellen im CSV Format speichern kann (optional). Pivotierte Tabellen werden von diesem Pfad gelesen.options
– Ein Wörterbuch der optionalen Parameter.-
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden Sie Relationalize, um ein verschachteltes Schema in einem DynamicFrame
zu vereinfachen
In diesem Codebeispiel wird die relationalize
-Methode verwendet, um ein verschachteltes Schema in ein Formular zu vereinfachen, das in eine relationale Datenbank passt.
Beispieldatensatz
Das Beispiel verwendet einen DynamicFrame
namens legislators_combined
mit dem folgenden Schema. legislators_combined
hat mehrere verschachtelte Felder wie links
, images
und contact_details
, die durch die relationalize
-Transformation vereinfacht werden.
root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
Beispiel-Code
# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "
s3://DOC-EXAMPLE-BUCKET/tmpDir
" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()
Mit der folgenden Ausgabe können Sie das Schema des aufgerufenen verschachtelten Felds contact_details
mit der Tabelle vergleichen, welche die relationalize
-Transformation erstellt hat. Beachten Sie, dass die Tabellendatensätze mithilfe eines aufgerufenen Fremdschlüssels id
und einer index
-Spalte, die die Positionen des Arrays darstellt, eine Verbindung zur Haupttabelle herstellen.
dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+
rename_field
rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Benennt ein Feld in diesem DynamicFrame
um und gibt einen neuen DynamicFrame
mit dem umbenannten Feld zurück.
-
oldName
– Der vollständiger Pfad zu dem Knoten, den Sie umbenennen möchten.Wenn der alte Name Punkte enthält, funktioniert
RenameField
nicht, es sei denn, Sie setzen Backticks darum (`
). Um beispielsweisethis.old.name
durchthisNewName
zu ersetzen, rufen Sie rename_field wie folgt auf:newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
-
newName
– Der neue Name, als vollständiger Pfad. -
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden Sie rename_field, um Felder in einem DynamicFrame
umzubenennen
In diesem Codebeispiel wird die rename_field
-Methode verwendet, um Felder in einem DynamicFrame
umzubenennen. Beachten Sie, dass in dem Beispiel die Methodenverkettung verwendet wird, um mehrere Felder gleichzeitig umzubenennen.
Anmerkung
Informationen zum Zugriff auf den Datensatz, der in diesem Beispiel verwendet wird, finden Sie unter Codebeispiel: Verknüpfen und Inbeziehungsetzen von Daten und folgen Sie den Anweisungen in Schritt 1: Crawlen der Daten im Amazon S3 Bucket.
Beispiel-Code
# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string
resolveChoice
resolveChoice(specs = None, choice = "" , database = None , table_name = None ,
transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id =
None)
Löst einen Auswahltyp innerhalb dieses DynamicFrame
auf und gibt den neuen DynamicFrame
zurück.
-
specs
– Eine Liste der aufzulösenden spezifischen Mehrdeutigkeiten, jede in Form eines Tupels:(field_path, action)
.Es gibt zwei Möglichkeiten für die Verwendung von
resolveChoice
. Zuerst muss dasspecs
-Argument verwendet werden, um eine Sequenz bestimmter Felder und deren Auflösung anzugeben. Der andere Modus fürresolveChoice
ist die Nutzung deschoice
-Arguments für die Angabe einer einzelnen Auflösung für alleChoiceTypes
.Werte für
specs
werden als Tupels angegeben, bestehend aus(field_path, action)
-Paaren. Derfield_path
-Wert identifiziert ein spezielles mehrdeutiges Element und deraction
-Wert identifiziert die entsprechende Auflösung. Im Folgenden sind die möglichen Aktionen aufgeführt:-
cast:
– Versucht, alle Werte in den angegebenen Typ umzuwandeln. Beispiel:type
cast:int
. -
make_cols
– Konvertiert die einzelnen verschiedenen Typen in eine Spalte namens
. Er löst eine potenzielle Mehrdeutigkeit durch Abflachen der Daten auf. WenncolumnName
_type
columnA
beispielsweiseint
oderstring
sein könnte, bestünde die Auflösung darin, zwei Spalten mit den NamencolumnA_int
undcolumnA_string
im resultierendenDynamicFrame
zu erzeugen. -
make_struct
– Löst eine potenzielle Mehrdeutigkeit durch Verwendung einerstruct
, um die Daten darzustellen. Wenn beispielsweise Daten in einer Spalteint
oderstring
sein könnten, wird durch Verwendung dermake_struct
-Aktion eine Spalte von Strukturen im resultierendenDynamicFrame
erzeugt. Jede Struktur enthält sowohl einenint
als auch einenstring
. -
project:
– Löst eine potenzielle Mehrdeutigkeit durch Projizierung aller Daten auf einen der möglichen Datentypen. Wenn etwa Daten in einer Spaltetype
int
oderstring
sein könnten, wird mithilfe einerproject:string
-Aktion eine Spalte im resultierendenDynamicFrame
erzeugt, in der alleint
-Werte in Zeichenfolgen konvertiert wurden.
Wenn
field_path
ein Array identifiziert, platzieren Sie leere eckige Klammern hinter dem Namen des Arrays, um eine Mehrdeutigkeit zu vermeiden. Angenommen, Sie arbeiten mit Daten, die wie folgt strukturiert sind:"myList": [ { "price": 100.00 }, { "price": "$100.00" } ]
Sie können anstelle der Zeichenfolgenversion die numerische Version des Preises auswählen, indem Sie den
field_path
auf"myList[].price"
und dieaction
auf"cast:double"
setzen.Anmerkung
Sie können nur einen der Parameter
specs
undchoice
verwenden. Wenn derspecs
-Parameter nichtNone
ist, dann muss derchoice
-Parameter eine leere Zeichenfolge sein. Wenn umgekehrt derchoice
-Parameter keine leere Zeichenfolge ist, dann muss derspecs
-ParameterNone
sein. -
choice
– Gibt eine einzelne Auflösung für alleChoiceTypes
an. Sie können dies verwenden, wenn die vollständige Liste derChoiceTypes
vor der Laufzeit unbekannt ist. Zusätzlich zu den soeben fürspecs
aufgeführten Aktionen unterstützt dieses Argument noch die folgende Aktion:-
match_catalog
– Versucht jedenChoiceType
in einen entsprechenden Typ in der angegebenen Data-Catalog-Tabelle umzuwandeln.
-
-
database
– Die Data-Catalog-Datenbank für die Verwendung mit dermatch_catalog
-Aktion. -
table_name
– Die Data-Catalog-Tabelle für die Verwendung mit dermatch_catalog
-Aktion. -
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Null als Standard gibt an, dass der Prozess keinen Fehler ausgeben sollte. -
catalog_id
– Die Katalog-ID des Data Catalogs, auf den zugegriffen wird (die Konto-ID des Data Catalogs). Wenn aufNone
(Standardwert) festgelegt, wird die Katalog-ID des aufrufenden Kontos verwendet.
Beispiel: Wird verwendet resolveChoice , um eine Spalte zu behandeln, die mehrere Typen enthält
In diesem Codebeispiel wird die resolveChoice
-Methode verwendet, um anzugeben, wie mit einer DynamicFrame
-Spalte umgegangen werden soll, die Werte mehrerer Typen enthält. Das Beispiel zeigt zwei gängige Methoden zur Behandlung einer Spalte mit unterschiedlichen Typen:
Umwandeln der Spalte in einen einzelnen Datentyp.
Beibehalten aller Typen in separaten Spalten.
Beispieldatensatz
Anmerkung
Informationen zum Zugriff auf den Datensatz, der in diesem Beispiel verwendet wird, finden Sie unter Codebeispiel: Datenvorbereitung mit ResolveChoice, Lambda und ApplyMapping und folgen Sie den Anweisungen in Schritt 1: Crawlen der Daten im Amazon S3 Bucket.
Das Beispiel verwendet einen DynamicFrame
mit dem Namen medicare
mit dem folgenden Schema:
root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string
Beispiel-Code
# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows
select_fields
select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Gibt einen neuen DynamicFrame
zurück, der die ausgewählten Felder enthält.
-
paths
– Eine Liste von Zeichenfolgen. Jede Zeichenfolgen ist ein vollständiger Pfad zu einem Knoten der obersten Ebene ist, den Sie auswählen möchten. -
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden von select_fields zum Erstellen eines neuen DynamicFrame
mit ausgewählten Feldern
Im folgenden Beispielcode wird gezeigt, wie Sie die select_fields
-Methode zum Erstellen eines neuen DynamicFrame
mit einer ausgewählten Liste von Feldern aus einem vorhandenen DynamicFrame
verwenden können.
Anmerkung
Informationen zum Zugriff auf den Datensatz, der in diesem Beispiel verwendet wird, finden Sie unter Codebeispiel: Verknüpfen und Inbeziehungsetzen von Daten und folgen Sie den Anweisungen in Schritt 1: Crawlen der Daten im Amazon S3 Bucket.
# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows
simplify_ddb_json
simplify_ddb_json(): DynamicFrame
Vereinfacht verschachtelte Spalten in aDynamicFrame
, die sich speziell in der JSON DynamoDB-Struktur befinden, und gibt eine neue vereinfachte Spalte zurück. DynamicFrame
Wenn ein Listentyp mehrere Typen oder einen Map-Typ enthält, werden die Elemente in der Liste nicht vereinfacht. Beachten Sie, dass dies ein bestimmter Transformationstyp ist, der sich anders verhält als die reguläre unnest
Transformation und erfordert, dass sich die Daten bereits in der DynamoDB-Struktur befindenJSON. Weitere Informationen finden Sie unter DynamoDB JSON.
Das Schema eines Lesens eines Exports mit der JSON DynamoDB-Struktur könnte beispielsweise wie folgt aussehen:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
Die simplify_ddb_json()
-Transformation würde dies folgendermaßen umwandeln:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Beispiel: Verwenden Sie simplify_ddb_json, um eine DynamoDB-Vereinfachung aufzurufen JSON
In diesem Codebeispiel wird die simplify_ddb_json
Methode verwendet, um den AWS Glue DynamoDB-Exportconnector zu verwenden, einen JSON DynamoDB-Simplify aufzurufen und die Anzahl der Partitionen zu drucken.
Beispiel-Code
from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())
spigot
spigot(path, options={})
Schreibt Beispieldatensätze an ein bestimmtes Ziel, damit Sie die von Ihrem Auftrag durchgeführten Transformationen überprüfen können.
-
path
– Der Pfad zum Ziel, an das geschrieben werden soll (erforderlich). -
options
– Schlüssel-Wert-Paare, die Optionen angeben (optional). Die"topk"
-Option gibt an, dass die erstenk
-Datensätze geschrieben werden sollen. Die"prob"
-Option gibt an, wie wahrscheinlich es ist (in Form einer Dezimalzahl), dass ein bestimmter Datensatz ausgewählt wird. Sie können sie verwenden, um Datensätze auszuwählen, die geschrieben werden sollen. transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional).
Beispiel: Verwenden Sie Spigot, um Beispielfelder von einem DynamicFrame
an Amazon S3 zu schreiben
In diesem Codebeispiel wird die spigot
-Methode verwendet, um nach der Anwendung der select_fields
-Transformation Beispieldatensätze in einen Amazon-S3-Bucket zu schreiben.
Beispieldatensatz
Anmerkung
Informationen zum Zugriff auf den Datensatz, der in diesem Beispiel verwendet wird, finden Sie unter Codebeispiel: Verknüpfen und Inbeziehungsetzen von Daten und folgen Sie den Anweisungen in Schritt 1: Crawlen der Daten im Amazon S3 Bucket.
Das Beispiel verwendet einen DynamicFrame
mit dem Namen persons
mit dem folgenden Schema:
root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string
Beispiel-Code
# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="
s3://DOC-EXAMPLE-BUCKET
", options={"topk": 10} )
Im Folgenden finden Sie ein Beispiel für die Daten, die spigot
an Amazon S3 schreibt. Da der Beispielcode options={"topk": 10}
angegeben hat, enthalten die Beispieldaten die ersten zehn Datensätze.
{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}
split_fields
split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Gibt einen neuen DynamicFrameCollection
zurück, der zwei DynamicFrames
enthält. Der erste DynamicFrame
enthält alle Knoten, die abgespaltet wurden, und der zweite enthält die Knoten, die verbleiben.
-
paths
– Eine Liste von Zeichenfolgen, von denen jede ein vollständiger Pfad zu einem Knoten ist, den Sie in einen neuenDynamicFrame
teilen möchten. -
name1
– Eine Namenszeichenfolge für denDynamicFrame
, der abgespaltet ist. -
name2
– Eine Namenszeichenfolge für denDynamicFrame
, der verbleibt, nachdem die angegebenen Knoten abgespaltet wurden. -
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden Sie split_fields, um ausgewählte Felder in einen separaten DynamicFrame
aufzuteilen
In diesem Codebeispiel wird die split_fields
-Methode verwendet, um eine Liste von angegebenen Feldern in einen separaten DynamicFrame
aufzuteilen.
Beispieldatensatz
Das Beispiel verwendet einen DynamicFrame
namens l_root_contact_details
, der aus einer Sammlung mit dem Namen legislators_relationalized
stammt.
l_root_contact_details
hat das folgende Schema und folgende Einträge.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...
Beispiel-Code
# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string
split_rows
split_rows(comparison_dict, name1, name2, transformation_ctx="", info="",
stageThreshold=0, totalThreshold=0)
Teilt eine oder mehrere Zeilen in einem DynamicFrame
auf einen neuen DynamicFrame
auf.
Die Methode gibt einen neuen DynamicFrameCollection
zurück, der zwei DynamicFrames
enthält. Der erste DynamicFrame
enthält alle Zeilen, die abgespaltet wurden, und der zweite enthält die Zeilen, die verbleiben.
-
comparison_dict
– Ein Wörterbuch, in dem der Schlüssel ein Pfad zu einer Spalte ist und der Wert ein weiteres Wörterbuch für das Mapping von Vergleichsoperatoren zu Werten ist, mit denen der Spaltenwert verglichen wird. Beispielsweise spaltet{"age": {">": 10, "<": 20}}
alle Zeilen ab, deren Wert in der Spalte Alter größer als 10 und kleiner als 20 ist. -
name1
– Eine Namenszeichenfolge für denDynamicFrame
, der abgespaltet ist. -
name2
– Eine Namenszeichenfolge für denDynamicFrame
, der verbleibt, nachdem die angegebenen Knoten abgespaltet wurden. -
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden Sie split_rows, um Zeilen in einem DynamicFrame
abzuspalten
In diesem Codebeispiel wird die split_rows
-Methode verwendet, um Zeilen in einem DynamicFrame
auf der Grundlage des id
-Feldwerts abzuspalten.
Beispieldatensatz
Das Beispiel verwendet einen DynamicFrame
namens l_root_contact_details
, der aus einer Sammlung mit dem Namen legislators_relationalized
ausgewählt wird.
l_root_contact_details
hat das folgende Schema und folgende Einträge.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+
Beispiel-Code
# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows
unbox
unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)
Entpackt (formatiert neu) ein Zeichenfolgefeld in einem DynamicFrame
und gibt einen neuen DynamicFrame
zurück, der die entpackten DynamicRecords
enthält.
Ein DynamicRecord
stellt einen logischen Datensatz in einem DynamicFrame
dar. Er ist mit einer Zeile in einem Apache Spark DataFrame
vergleichbar, mit der Ausnahme, dass er selbstbeschreibend ist und für Daten verwendet werden kann, die keinem festen Schema entsprechen.
-
path
– Ein vollständiger Pfad zu dem Zeichenfolgeknoten, den Sie entpacken möchten. format
– Eine Formatspezifikation (optional). Sie verwenden dies für einen Amazon S3 oder AWS Glue Verbindung, die mehrere Formate unterstützt. Informationen zu den unterstützten Formaten finden Sie unter Mögliche Formate für Eingaben und Ausgaben in AWS Glue für Spark.-
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
options
– Eine oder mehrere der folgenden Aktionen:separator
– Eine Zeichenfolge mit Trennzeichen.escaper
– Eine Zeichenfolge mit Escape-Zeichen.skipFirst
– Ein Boolescher Wert, der angibt, ob die erste Instance übersprungen werden soll.-
withSchema
— Eine Zeichenfolge, die eine JSON Darstellung des Schemas des Knotens enthält. Das Format der JSON Darstellung eines Schemas wird durch die Ausgabe von definiertStructType.json()
. withHeader
– Ein Boolescher Wert, der angibt, ob ein Header enthalten ist.
Beispiel: Verwenden Sie unbox, um ein Zeichenkettenfeld in eine Struktur zu entpacken
In diesem Codebeispiel wird die unbox
-Methode verwendet, um ein Zeichenfolgenfeld in einem DynamicFrame
in ein Feld vom Typ Struktur zu entpacken oder neu zu formatieren.
Beispieldatensatz
Das Beispiel verwendet einen DynamicFrame
mit dem Namen mapped_with_string
mit dem folgenden Schema und den folgenden Einträgen.
Beachten Sie das Feld mit dem Namen AddressString
. Dies ist das Feld, das das Beispiel in eine Struktur entpackt.
root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...
Beispiel-Code
# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows
union
union(frame1, frame2, transformation_ctx = "",
info = "", stageThreshold = 0, totalThreshold = 0)
Union zwei DynamicFrames. Gibt zurück DynamicFrame , die alle Datensätze aus beiden Eingaben enthält DynamicFrames. Diese Transformation kann aus der Vereinigung von zwei DataFrames mit äquivalenten Daten unterschiedliche Ergebnisse liefern. Wenn Sie das DataFrame Union-Verhalten von Spark benötigen, sollten Sie es verwendentoDF
.
-
frame1
— Zuerst DynamicFrame zur Vereinigung. -
frame2
— An zweiter Stelle DynamicFrame nach der Gewerkschaft. -
transformation_ctx
– (optional) Eine eindeutige Zeichenfolge zur Identifikation von Status-/Zustandsinformationen -
info
– (optional) Eine Zeichenfolge im Zusammenhang mit Fehlern bei der Transformation -
stageThreshold
– (optional) Maximale Anzahl von Fehlern bei der Transformation, bis die Verarbeitung aufgrund eines Fehlers fehlschlägt -
totalThreshold
– (optional) Maximale Anzahl von Fehlern insgesamt, bis die Verarbeitung aufgrund eines Fehlers fehlschlägt.
unnest
unnest(transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Löst verschachtelte Objekte in einem DynamicFrame
auf, wodurch sie zu Objekten der obersten Ebene werden, und gibt einen neuen, nicht verschachtelten DynamicFrame
zurück.
-
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte. -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional). Die Standardeinstellung ist Null, was darauf hinweist, dass der Prozess nicht fehlerhaft sein sollte.
Beispiel: Verwenden Sie unnest, um verschachtelte Felder in Felder der obersten Ebene umzuwandeln
In diesem Codebeispiel wird die unnest
-Methode verwendet, um alle verschachtelten Felder in einem DynamicFrame
zu Feldern der obersten Ebene zu vereinfachen.
Beispieldatensatz
Das Beispiel verwendet einen DynamicFrame
mit dem Namen mapped_medicare
mit dem folgenden Schema. Beachten Sie, dass das Address
-Feld das einzige Feld ist, das verschachtelte Daten enthält.
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
Beispiel-Code
# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
unnest_ddb_json
Löscht verschachtelte Spalten in aDynamicFrame
, die sich speziell in der JSON DynamoDB-Struktur befinden, und gibt eine neue, nicht verschachtelte Spalte zurück. DynamicFrame
Bei Spalten, die aus einem Array von Strukturtypen bestehen, wird die Verschachtelung nicht aufgehoben. Beachten Sie, dass es sich dabei um einen speziellen Transformationstyp ohne Verschachtelung handelt, der sich anders verhält als die reguläre unnest
Transformation und erfordert, dass sich die Daten bereits in der DynamoDB-Struktur befinden. JSON Weitere Informationen finden Sie unter DynamoDB JSON.
unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
-
transformation_ctx
– Eine eindeutige Zeichenfolge zur Identifikation von Statusinformationen (optional). -
info
– Eine Zeichenfolge, die den Fehlermeldungen für diese Transformation zugeordnet werden soll (optional). -
stageThreshold
– Die Anzahl der aufgetretenen Fehler während dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional: Null als Standard gibt an, dass der Prozess keinen Fehler ausgeben sollte). -
totalThreshold
– Die Anzahl der aufgetretenen Fehler bis einschließlich dieser Transformation bei der der Prozess einen Fehler ausgeben sollte (optional: Null als Standard gibt an, dass der Prozess keinen Fehler ausgeben sollte).
Das Schema eines Lesens eines Exports mit der JSON DynamoDB-Struktur könnte beispielsweise wie folgt aussehen:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
Die unnest_ddb_json()
-Transformation würde dies folgendermaßen umwandeln:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
Das folgende Codebeispiel zeigt, wie der AWS Glue DynamoDB-Exportconnector verwendet, ein JSON DynamoDB-Unnest aufgerufen und die Anzahl der Partitionen ausgegeben wird:
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()
write
write(connection_type, connection_options, format, format_options, accumulator_size)
Bezieht einen DataSink(object) des angegebenen Verbindungstyps vom GlueContext Klasse dieses DynamicFrame
und verwendet ihn zum Formatieren und Schreiben des Inhalts dieses DynamicFrame
. Gibt den neuen DynamicFrame
wie angegeben formatiert und geschrieben zurück.
-
connection_type
– Der zu verwendende Verbindungstyp. Gültige Werte sinds3
,mysql
,postgresql
,redshift
,sqlserver
undoracle
. -
connection_options
– Die zu verwendende Verbindungsoption (optional). Für denconnection_type
s3
ist ein Amazon-S3-Pfad definiert.connection_options = {"path": "
s3://aws-glue-target/temp
"}Für JDBC Verbindungen müssen mehrere Eigenschaften definiert werden. Beachten Sie, dass der Datenbankname Teil von sein mussURL. Er kann optional in die Verbindungsoptionen eingeschlossen werden.
Warnung
Das Speichern von Passwörtern in Ihrem Skript wird nicht empfohlen. Erwägen Sie, sie
boto3
zu verwenden, um sie aus AWS Secrets Manager dem AWS Glue-Datenkatalog abzurufen.connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"} format
– Eine Formatspezifikation (optional). Dies wird für einen Amazon Simple Storage Service (Amazon S3) verwendet oder AWS Glue Verbindung, die mehrere Formate unterstützt. Informationen zu den unterstützten Formaten finden Sie unter Mögliche Formate für Eingaben und Ausgaben in AWS Glue für Spark.format_options
– Formatoptionen für das angegebene Format. Informationen zu den unterstützten Formaten finden Sie unter Mögliche Formate für Eingaben und Ausgaben in AWS Glue für Spark.accumulator_size
– Die zu verwendende akkumulierbare Größe in Bytes (optional).
– Fehler –
assertErrorThreshold
assertErrorThreshold( )
– Eine Assertion für Fehler in den Transformationen, die diesen DynamicFrame
erstellt hat. Gibt eine Exception
von dem zugrunde liegenden DataFrame
zurück.
errorsAsDynamicRahmen
errorsAsDynamicFrame( )
– Gibt einen DynamicFrame
zurück, der verschachtelte Fehlerdatensätze enthält.
Beispiel: Verwenden Sie errorsAsDynamic Frame, um Fehlerdatensätze anzuzeigen
Im folgenden Beispielcode wird gezeigt, wie Sie die errorsAsDynamicFrame
-Methode zum Anzeigen eines Fehlerdatensatzes für einen DynamicFrame
verwenden.
Beispieldatensatz
Das Beispiel verwendet den folgenden Datensatz, als den Sie auf Amazon S3 hochladen könnenJSON. Beachten Sie, dass der zweite Datensatz falsch formatiert ist. Fehlerhaft formatierte Daten unterbrechen in der Regel die Dateianalyse, wenn Sie Spark verwenden. SQL Allerdings erkennt DynamicFrame
Fehlbildungsprobleme und wandelt fehlerhafte Zeilen in Fehlerdatensätze um, die Sie individuell behandeln können.
{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}
Beispiel-Code
# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["
s3://DOC-EXAMPLE-S3-BUCKET/error_data.json
"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')
errorsCount
errorsCount( )
– Gibt die Gesamtzahl der Fehler in einem DynamicFrame
zurück.
stageErrorsCount
stageErrorsCount
– Gibt die Anzahl der aufgetretenen Fehler im Generierungsprozess dieses DynamicFrame
zurück.