Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
AWS Glue Scala DynamicFrame-Klasse
Paket: com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
Ein DynamicFrame
ist eine verteilte Sammlung von selbstbeschreibenden DynamicRecord Objekten.
DynamicFrame
s wurden entwickelt, um ein flexibles Datenmodell für ETL-Operationen (Extrahieren, Transformieren und Laden) bereitzustellen. Sie benötigen kein Schema zum Erstellen, und Sie können damit Daten lesen und transformieren, die unstrukturierte oder inkonsistente Werte und Typen enthalten. Ein Schema kann bei Bedarf für solche Operationen berechnet werden, die eines benötigen.
DynamicFrame
s bieten eine Reihe von Transformationen für die Datenreinigung und ETL. Sie unterstützen auch die Konvertierung zu und von SparkSQL DataFrames , um sie in vorhandenen Code und die vielen Analysevorgänge zu integrieren, die DataFrames bieten.
Die folgenden Parameter werden über viele AWS Glue-Transformationen hinweg geteilt, die DynamicFrame
s erstellen:
transformationContext
– Der Bezeichner für diesenDynamicFrame
. DertransformationContext
wird als Schlüssel für den Auftrags-Lesezeichenstatus verwendet, der während der Ausführungen persistent ist.callSite
– Liefert Kontextinformationen für die Fehlerberichterstattung. Diese Werte werden automatisch beim Aufruf von Python festgelegt.stageThreshold
– Die maximale Anzahl der Fehlerdatensätze, die aufgrund der Berechnung diesesDynamicFrame
zulässig sind, ehe eine Ausnahme ausgelöst wird. Ausgenommen sind Datensätze aus dem vorherigenDynamicFrame
.totalThreshold
– Die maximale Anzahl der Gesamtfehlersätze, bevor eine Ausnahme ausgelöst wird, einschließlich derjenigen aus früheren Frames.
Val errorsCount
val errorsCount
Die Anzahl der Fehlerdatensätze in diesem DynamicFrame
. Dazu zählen Fehler aus früheren Operationen.
Def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
– Eine Folge von Zuweisungen für die Erstellung eines neuenDynamicFrame
.caseSensitive
– Legt fest, ob bei Quellspalten die Groß-/Kleinschreibung beachtet wird. Die Festlegung auf „false“ kann bei der Integration in Datenspeicher wie dem AWS Glue Data Catalog helfen, bei dem die Groß-/Kleinschreibung nicht berücksichtigt wird.
Selektiert, projiziert und wandelt Spalten basierend auf Mappingreihenfolgen um.
Jede Zuweisung besteht aus einer Quellspalte und einem Typ und einer Zielspalte und einem Typ. Zuweisungen können entweder als „vierstelliger“ Tupel (source_path
, source_type
, target_path
, target_type
) oder als MappingSpec-Objekt, das dieselben Informationen enthält, angegeben werden.
Neben der Verwendung von Zuweisungen für einfache Projektionen und Umwandlungen können Sie diese auch zum Verschachteln oder Aufheben der Verschachtelung von Feldern verwenden, indem Sie Komponenten des Pfades mit „.
“ (Punkt) trennen.
Angenommen, Sie haben einen DynamicFrame
mit folgendem Schema:
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
Mit dem folgenden Aufruf können Sie die Verschachtelung der Felder state
und zip
aufheben:
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
Das resultierende Schema lautet wie folgt:
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
Sie können auch applyMapping
verwenden, um Spalten neu zu verschachteln. Durch folgende Operation wird beispielsweise die vorherige Transformation umgekehrt und eine neue Struktur namens address
am Ziel erstellt:
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
Feldnamen, die „.
“ (Punkt-)Zeichen enthalten, können mit Hilfe von Backticks (``
) zitiert werden.
Anmerkung
Zurzeit können Sie die applyMapping
-Methode nicht für die Zuweisung von Spalten verwenden, die unter Arrays verschachtelt sind.
Def assertErrorThreshold
def assertErrorThreshold : Unit
Eine Aktion, die eine Berechnung erzwingt und sicherstellt, dass die Anzahl der Fehlerdatensätze stageThreshold
und totalThreshold
nicht überschreitet. Löst eine Ausnahme aus, wenn eine der Bedingungen fehlschlägt.
Def count
lazy
def count
Gibt die Anzahl der Elemente in diesem DynamicFrame
zurück.
Def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame
zurück, bei dem die angegebene Spalte entfernt wurde.
Def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame
zurück, bei dem die angegebenen Spalten entfernt wurden.
Sie können diese Methode verwenden, um verschachtelte Spalten zu löschen, einschließlich derjenigen in Arrays. Sie kann aber nicht eingesetzt werden, um bestimmte Array-Elemente zu verwerfen.
Def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
Gibt einen neuen DynamicFrame
zurück, bei dem alle Nullspalten entfernt sind.
Anmerkung
Dies entfernt nur Spalten des Typs NullType
. Einzelne Nullwerte in anderen Spalten werden weder entfernt noch geändert.
Def- errorsAsDynamicFrame
def errorsAsDynamicFrame
Gibt einen neuen DynamicFrame
mit den Fehlersätzen aus diesem DynamicFrame
zurück.
Def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Erstellt einen neuen DynamicFrame
, der nur die Datensätze enthält, für die die Funktion „f
“ true
zurückgibt. Die Filterfunktion „f
“ sollte den Eingabe-Datensatz nicht verändern.
Def getName
def getName : String
Gibt den Namen dieses DynamicFrame
zurück.
Def getNumPartitions
def getNumPartitions
Gibt die Anzahl der Partitionen in diesem DynamicFrame
zurück.
Def getSchemaIfberechnet
def getSchemaIfComputed : Option[Schema]
Gibt das Schema zurück, wenn es bereits berechnet wurde. Scannt die Daten nicht, wenn das Schema noch nicht berechnet wurde.
Def isSchemaComputed
def isSchemaComputed : Boolean
Gibt "true
" zurück, wenn das Schema für diesen DynamicFrame
bereits berechnet wurde, andernfalls "false
". Wenn diese Methode "false" zurückgibt, erfordert der Aufruf der schema
-Methode eine erneute Übergabe dieser Datensätze im DynamicFrame
.
Def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
Def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
– Die Spalten in diesemDynamicFrame
, die für den Join verwendet werden sollen.keys2
– Die Spalten inframe2
, die für den Join verwendet werden sollen. Muss die gleiche Länge haben wiekeys1
.frame2
– Der andereDynamicFrame
für einen Join.
Gibt das Ergebnis der Durchführung eines equijoin mit frame2
über die angegebenen Schlüssel zurück.
Def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame
zurück, der erstellt wird, indem die angegebene Funktion „f
“ auf jeden Datensatz in diesem DynamicFrame
angewendet wird.
Diese Methode kopiert jeden Datensatz, ehe die angegebene Funktion angewendet wird, sodass das Verändern der Datensätze sicher ist. Wenn die Zuweisungsfunktion eine Ausnahme für einen bestimmten Datensatz auslöst, wird der Datensatz als fehlerhaft gekennzeichnet und der Stack-Trace als Spalte im Fehlerdatensatz gespeichert.
Def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
– Der Staging-DynamicFrame
, der zusammengeführt werden soll.primaryKeys
– Die Liste der Primärschlüsselfelder, mit denen Datensätze aus der Quelle und Staging-DynamicFrame
s übereinstimmen.transformationContext
– Eine eindeutige Zeichenfolge, die zum Abrufen von Metadaten über die aktuelle Transformation verwendet wird (optional).options
– Eine Zeichenfolge von JSON-Name-Wert-Paaren, die zusätzliche Informationen für diese Transformation bereitstellen.callSite
– Wird verwendet, um Kontextinformationen für Fehlerberichte bereitzustellen.stageThreshold
– EinLong
. Die Anzahl der Fehler in der angegebenen Transformation, für die die Verarbeitung fehlerhaft sein muss.totalThreshold
– EinLong
. Die Gesamtzahl der Fehler bis einschließlich dieser Transformation, bei denen die Verarbeitung fehlerhaft sein muss.
Führt dieses DynamicFrame
mit einem Staging-DynamicFrame
basierend auf den angegebenen Primärschlüsseln zusammen, um Datensätze zu identifizieren. Doppelte Datensätze (Datensätze mit denselben Primärschlüsseln) werden nicht dedupliziert. Wenn kein übereinstimmender Datensatz im Staging-Frame vorhanden ist, werden alle Datensätze (einschließlich Duplikate) von der Quelle beibehalten. Wenn der Staging-Frame übereinstimmende Datensätze enthält, überschreiben die Datensätze aus dem Staging-Frame die Datensätze in der Quelle in AWS Glue.
Der zurückgegebene DynamicFrame
enthält Datensatz A in folgenden Fällen:
Wenn sowohl im Quell- als auch im Staging-Frame
A
vorhanden ist, wirdA
im Staging-Frame zurückgegeben.Wenn sich
A
in der Quelltabelle undA.primaryKeys
nicht instagingDynamicFrame
befindet (d. h.A
wird nicht in der Staging-Tabelle aktualisiert).
Der Quell- und der Staging-Frame müssen nicht dasselbe Schema haben.
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
Def printSchema
def printSchema : Unit
Druckt das Schema dieses DynamicFrame
in einem lesbaren Format in stdout
.
Def recomputeSchema
def recomputeSchema : Schema
Erzwingt eine Neuberechnung des Schemas. Dies erfordert einen Scan über die Daten, aber es kann das Schema „verschärfen“, wenn es einige Felder im aktuellen Schema gibt, die nicht in den Daten vorhanden sind.
Gibt das neu berechnete Schema zurück.
Def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
– Der Name für die BasisDynamicFrame
in der Ausgabe.DynamicFrame
s, die durch das Zusammenfassen von Arrays erstellt werden, beginnen mit diesem als Präfix.stagingPath
– Der Amazon-S3-Pfad (Amazon Simple Storage Service) zum Schreiben von Zwischendaten.options
– Relationalisierung von Optionen und Konfiguration. Derzeit nicht verwendet.
Gleicht alle verschachtelten Strukturen an und pivotiert Arrays in separate Tabellen.
Mit dieser Operation können Sie tief verschachtelte Daten für die Aufnahme in eine relationale Datenbank vorbereiten. Verschachtelte Strukturen werden genauso wie die Unnest-Transformation auf eine Ebene gebracht. Außerdem werden Arrays in separate Tabellen pivotiert. Dabei wird jedes Array-Element zu einer Zeile. Angenommen, Sie haben einen DynamicFrame
mit den folgenden Daten:
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
Führen Sie folgenden Code aus.
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
Es werden zwei Tabellen erstellt. Die erste Tabelle trägt den Namen „people“ und enthält Folgendes:
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
Das Freunde-Array wurde durch einen automatisch generierten Join-Schlüssel ersetzt. Eine separate Tabelle namens people.friends
mit folgendem Inhalt wird erstellt:
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
In dieser Tabelle ist „id
“ ein Join-Schlüssel, der identifiziert, aus welchem Datensatz das Array-Element stammt, „index
“ bezieht sich auf die Position im ursprünglichen Array und „val
“ steht für den tatsächlichen Array-Eintrag.
Die relationalize
-Methode gibt die Sequenz von DynamicFrame
s zurück, die durch rekursives Anwenden dieses Prozesses auf alle Arrays erzeugt werden.
Anmerkung
Die AWS Glue-Bibliothek generiert automatisch Join-Schlüssel für neue Tabellen. Um sicherzustellen, dass Join-Schlüssel über alle Auftragsausführungen hinweg eindeutig sind, müssen Sie Auftrags-Lesezeichen aktivieren.
Def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
– Der ursprüngliche Name der Spalte.newName
– Der neue Name der Spalte.
Gibt einen neuen DynamicFrame
zurück, wobei das angegebene Feld umbenannt ist.
Mit dieser Methode können Sie verschachtelte Felder umbenennen. Der folgende Code benennt beispielsweise innerhalb der Adressenstruktur state
zu state_code
um:
{{{ df.renameField("address.state", "address.state_code") }}}
Def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame
mit numPartitions
Partitionen zurück.
Def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
– Eine Aktion, die auf alleChoiceType
-Spalten anzuwenden ist, die nicht in der Spezifikationsreihenfolge aufgeführt sind.database
– Die Data-Catalog-Datenbank zur Verwendung mit dermatch_catalog
-Aktion.tableName
– Die Data-Catalog-Tabelle zur Verwendung mit dermatch_catalog
-Aktion.
Gibt einen neuen DynamicFrame
zurück, indem eine oder mehrere ChoiceType
s durch einen spezifischeren Typ ersetzt werden.
Es gibt zwei Möglichkeiten für die Verwendung von resolveChoice
. Die erste besteht in der Angabe bestimmter Spalten und der Art, wie diese aufgelöst werden. Sie werden als Tupels, bestehend aus (Spalte, Aktion)-Paaren, angegeben.
Im Folgenden sind die möglichen Aktionen aufgeführt:
cast:type
– Versucht, alle Werte in den angegebenen Typ umzuwandeln.make_cols
– Konvertiert die einzelnen verschiedenen Typen in eine Spalte namenscolumnName_type
.make_struct
– Konvertiert eine Spalte in eine Struktur mit Schlüssel für die individuellen Typen.project:type
– Behält nur Wert des angegebenen Typs bei.
Der andere Modus für resolveChoice
dient zum Angeben einer einzigen Auflösung für alle ChoiceType
s. Sie können diesen verwenden, wenn die vollständige Liste der ChoiceType
s vor der Ausführung unbekannt ist. Zusätzlich zu den soeben aufgeführten Aktionen unterstützt dieser Modus noch die folgende Aktion:
match_catalog
ChoiceType
– Versucht jeden in einen entsprechenden Typ in der angegebenen Katalogtabelle umzuwandeln.
Beispiele:
Lösen Sie die user.id
-Spalte auf, indem Sie eine Umwandlung in ein „int“ durchführen, und sorgen Sie dafür, dass das address
-Feld nur Strukturen beibehält:
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
Lösen Sie alle ChoiceType
s auf, indem Sie jede Auswahl in eine eigene Spalte umwandeln:
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
Lösen Sie alle ChoiceType
s auf, indem Sie diese in die Typen in der angegebenen Katalogtabelle umwandeln.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
Def schema
def schema : Schema
Gibt das Schema dieses DynamicFrame
zurück.
Das zurückgegebene Schema enthält garantiert alle Felder in einem Datensatz in diesem DynamicFrame
. In einigen wenigen Fällen kann es aber auch zusätzliche Felder enthalten. Die Unnest-Methode kann verwendet werden, um das Schema basierend auf den Datensätzen in diesem DynamicFrame
zu „straffen“.
Def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt ein einzelnes Feld als DynamicFrame
zurück.
Def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
– Die Reihenfolge der zu wählenden Spaltennamen.
Gibt einen neuen DynamicFrame
mit den angegebenen Spalten zurück.
Anmerkung
Sie können die selectFields
-Methode nur verwenden, um Spalten der obersten Ebene auszuwählen. Sie können die applyMapping-Methode zum Auswählen verschachtelter Spalten einsetzen.
Def show
def show( numRows : Int = 20 ) : Unit
numRows
– Die Anzahl der zu druckenden Zeilen.
Druckt Zeilen aus diesem DynamicFrame
im JSON-Format.
Def simplifyDDBJson
DynamoDB-Exporte mit dem AWS Glue DynamoDB-Export-Konnektor führen zu JSON-Dateien mit bestimmten verschachtelten Strukturen. Weitere Informationen finden Sie unter Datenobjekte. simplifyDDBJson
Vereinfacht verschachtelte Spalten in einem DynamicFrame dieser Art von Daten und gibt einen neuen vereinfachten zurück DynamicFrame. Wenn mehrere Typen oder ein Zuordnungstyp in einem Listentyp enthalten sind, werden die Elemente in der Liste nicht vereinfacht. Diese Methode unterstützt nur Daten im DynamoDB-Export-JSON-Format. Erwägen Sieunnest
, ähnliche Änderungen an anderen Arten von Daten vorzunehmen.
def simplifyDDBJson() : DynamicFrame
Diese Methode verwendet keine Parameter.
Beispieleingabe
Betrachten Sie das folgende Schema, das durch einen DynamoDB-Export generiert wurde:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
Beispiel-Code
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "
ddbTableARN
", "dynamodb.s3.bucket" -> "exportBucketLocation
", "dynamodb.s3.prefix" -> "exportBucketPrefix
", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID
", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
Die Transformation simplifyDDBJson
vereinfacht dies zu:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Pass-Through-Transformation, die die gleichen Datensätze zurückgibt, darüber hinaus aber noch ein Subset an Datensätzen schreibt.
path
– Der Pfad in Amazon S3 zum Schreiben der Ausgabe in der Forms3://bucket//path
.options
– Eine optionaleJsonOptions
-Zuweisung, die das Sampling-Verhalten beschreibt.
Gibt einen DynamicFrame
zurück, der die gleichen Datensätze wie dieser enthält.
Standardmäßig werden 100 willkürliche Datensätze in den durch path
angegebenen Speicherort geschrieben. Sie können dieses Verhalten anpassen, indem Sie die options
-Zuweisung verwenden. Gültige Schlüssel enthalten Folgendes.
topk
– Gibt die Gesamtzahl der geschriebenen Datensätze an. Der Standardwert ist 100.prob
– Gibt an, wie wahrscheinlich es ist (in Form einer Dezimalzahl), dass ein einzelner Datensatz enthalten ist. Standard = 1.
Beispielsweise würde der folgende Aufruf den Datensatz sampeln, indem er jeden Datensatz mit einer Wahrscheinlichkeit von 20 % auswählt und nach dem Schreiben von 200 Datensätzen stoppt:
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
Def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
– Die Pfade, die in den erstenDynamicFrame
aufzunehmen sind.
Gibt eine Abfolge zweier DynamicFrame
s zurück. Die erste DynamicFrame
enthält die angegebenen Pfade und die zweite alle anderen Spalten.
Beispiel
In diesem Beispiel wird ein aus der persons
Tabelle in der legislators
Datenbank im AWS Glue Data Catalog DynamicFrame erstellt und DynamicFrame in zwei aufgeteilt, wobei die angegebenen Felder in das erste DynamicFrame und die verbleibenden Felder in ein zweites gehen DynamicFrame. Das Beispiel wählt dann die erste DynamicFrame aus dem Ergebnis aus.
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
Def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
Teilt Zeilen basierend auf Prädikaten, die Konstanten mit Spalten vergleichen, auf.
paths
– Die Spalten, die zum Vergleich verwendet werden sollen.values
– Die konstanten Werte, die zum Vergleich verwendet werden sollen.operators
– Die zum Vergleich zu verwendenden Operatoren.
Gibt eine Abfolge zweier DynamicFrame
s zurück. Die erste enthält Zeilen, für die das Prädikat "true" ist, und die zweite enthält solche, bei denen es "false" ist.
Prädikate werden über drei Sequenzen spezifiziert: „paths
“ enthält die (evtl. geschachtelten) Spaltennamen, „values
“ enthält die zu vergleichenden konstanten Werte und „operators
“ enthält die Operatoren, die zum Vergleich verwendet werden sollen. Alle drei Sequenzen müssen gleich lang sein: Der n
. Operator wird verwendet, um die n
. Spalte mit dem n
. Wert zu vergleichen.
Jeder Operator muss "!=
", "=
", "<=
", "<
", ">=
" oder ">
" sein.
Beispielsweise teilt der folgende Aufruf einen DynamicFrame
so, dass der erste Ausgabe-Frame die Datensätze von Personen über 65 aus den USA enthält und der zweite alle anderen:
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
Def stageErrorsCount
def stageErrorsCount
Gibt die Anzahl der Fehlerdatensätze zurück, die bei der Berechnung dieses DynamicFrame
generiert wurden. Ausgenommen sind Fehler aus vorherigen Operationen, die diesem DynamicFrame
als Eingabe übergeben wurden.
Def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
Konvertiert DynamicFrame
in einen Apache Spark SQL DataFrame
mit demselben Schema und denselben Datensätzen.
Anmerkung
Da DataFrame
s ChoiceType
s nicht unterstützen, konvertiert diese Methode ChoiceType
-Spalten automatisch in StructType
s. Weitere Informationen und Optionen zur Auflösung der Auswahl finden Sie unter resolveChoice.
Def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
– Die zu analysierende Spalte. Muss eine Zeichenfolge oder ein Binärwert sein.format
– Das für die Analyse zu verwendende Format.optionString
– An das Format zu übergebende Optionen, beispielsweise das CSV-Trennzeichen.
Analysiert eine eingebettete Zeichenfolge oder eine binäre Spalte entsprechend des angegebenen Formats. Analysierte Spalten werden unterhalb einer Struktur mit dem ursprünglichen Spaltennamen verschachtelt.
Angenommen, Sie haben eine CSV-Datei mit einer eingebetteten JSON-Spalte.
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
Nach einer ersten Analyse erhalten Sie einen DynamicFrame
mit folgendem Schema:
{{{ root |-- name: string |-- age: int |-- address: string }}}
Sie können unbox
für die Adressspalte aufrufen, um die einzelnen Komponenten zu analysieren.
{{{ df.unbox("address", "json") }}}
Dadurch erhalten wir einen DynamicFrame
mit folgendem Schema:
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame
zurück, bei dem allen verschachtelten Strukturen auf eine Ebene gebracht wurden. Namen werden mit Hilfe des „.
“ (Punkt-)Zeichens erstellt.
Angenommen, Sie haben einen DynamicFrame
mit folgendem Schema:
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Der folgende Aufruf hebt die Einbettung der Adressenstruktur auf:
{{{ df.unnest() }}}
Das resultierende Schema lautet wie folgt:
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
Diese Methode verhindert auch verschachtelte Strukturen innerhalb von Arrays. Aus historischen Gründen werden den Namen solcher Felder jedoch der Name des umschließenden Arrays und „.val
“ vorangestellt.
Def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
Hebt die Verschachtelung der Spalten in einem DynamicFrame
auf, die sich speziell in der DynamoDB-JSON-Struktur befinden, und gibt einen neuen, nicht verschachtelten DynamicFrame
zurück. Bei Spalten, die aus einem Array von Strukturtypen bestehen, wird die Verschachtelung nicht aufgehoben. Dies ist ein spezieller Typ der Transformation zum Aufheben der Verschachtelung, der sich anders verhält als die reguläre unnest
-Transformation und erfordert, dass sich die Daten bereits in der DynamoDB-JSON-Struktur befinden. Weitere Informationen finden Sie unter DYNAMODB JSON.
Das Schema eines Vorgangs zum Lesen eines Exports mit der DynamoDB-JSON-Struktur könnte beispielsweise wie folgt aussehen:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
Die unnestDDBJson()
-Transformation würde dies folgendermaßen umwandeln:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
Das folgende Codebeispiel veranschaulicht, wie Sie den AWS-Glue-DynamoDB-Export-Konnektor verwenden, die Aufhebung einer DynamoDB-JSON-Verschachtelung aufrufen und die Anzahl der Partitionen ausdrucken:
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
Def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
– Eine Funktion, die das zu verwendende Schema zurückgibt. Wird als Null-Parameter-Funktion angegeben, um eine möglicherweise kostenintensive Berechnung zu verhindern.
Legt das Schema dieses DynamicFrame
auf den angegebenen Wert fest. Dies wird in erster Linie intern verwendet, um eine kostspielige Neuberechnung des Schemas zu vermeiden. Das übergebene Schema muss alle Spalten enthalten, die in den Daten vorhanden sind.
Def withName
def withName( name : String ) : DynamicFrame
name
– Der zu verwendende neue Name.
Gibt eine Kopie dieser DynamicFrame
mit einem neuen Namen zurück.
Def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
Gibt eine Kopie dieser DynamicFrame
mit dem angegebenen Transformationskontext zurück.