Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
AWS GlueClasse Scala DynamicFrame
Package : com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
Un DynamicFrame
est une collection distribuée d'objets DynamicRecord à description automatique.
Le DynamicFrame
est conçu pour fournir un modèle de données flexible pour les opérations ETL (extraction, transformation et chargement). Il ne nécessite pas de créer un schéma et peut être utilisé pour lire et transformer les données contenant des valeurs et des types incohérents ou complexes. Un schéma peut être calculé à la demande pour les opérations qui en nécessitent un.
Les DynamicFrame
fournissent une plage de transformations pour le nettoyage des données et ETL. Ils prennent également en charge la conversion vers et depuis DataFrames SparkSQL afin de s'intégrer au code existant et aux nombreuses opérations d'analyse qui en découlent. DataFrames
Les paramètres suivants sont partagés entre plusieurs transformations AWS Glue qui construisent les DynamicFrame
s :
transformationContext
– Identificateur pour ceDynamicFrame
. LetransformationContext
est utilisé en tant que clé pour l'état de marque-page de tâche conservé d'une exécution à l'autre.callSite
— fournit les informations de contexte pour le signalement d'erreurs. Ces valeurs sont automatiquement définies lors de l'appel à partir de Python.stageThreshold
— Nombre maximal d'enregistrements d'erreurs autorisés depuis le calcul duDynamicFrame
avant de lever une exception, à l'exclusion des enregistrements présents dans leDynamicFrame
précédent.totalThreshold
— Nombre maximal d'enregistrements d'erreur avant qu'une exception ne soit levée, y compris ceux des images précédentes.
val errorsCount
val errorsCount
Nombre d'enregistrements d'erreur dans le DynamicFrame
. Les erreurs des opérations précédentes sont incluses dans le nombre.
def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
– Séquence des mappages pour construire un nouveauDynamicFrame
.caseSensitive
— indique si les colonnes sources sont considérées comme sensibles à la casse. L'attribution de la valeur false à ce paramètre peut aider lors de l'intégration de magasins insensibles à la casse comme AWS Glue Data Catalog.
Sélectionne, projette et convertit les colonnes en fonction d'une séquence de mappages.
Chaque mappage se compose d'une colonne source et d'un type, ainsi que d'une colonne cible et d'un type. Les mappages peuvent être spécifiés sous forme de quatre tuples (source_path
, source_type
, target_path
, target_type
) ou d'un objet MappingSpec contenant les mêmes informations.
En plus des projections simples et de la conversion, les mappages peuvent être utilisés pour imbriquer ou désimbriquer les champs en séparant les composants du chemin d'accès par « .
» (point).
Par exemple, supposons que vous ayez une trame DynamicFrame
avec le schéma suivant.
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
Vous pouvez effectuer l'appel suivant pour désimbriquer les champs state
et zip
.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
Le schéma obtenu est le suivant.
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
Vous pouvez également utiliser applyMapping
pour réimbriquer les colonnes. Par exemple, ce qui suit inverse la transformation précédente et crée une structure nommée address
dans la cible.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
Les noms de champ qui contiennent des caractères « .
» (point) peuvent être placés entre guillemets (``
).
Note
La méthode applyMapping
ne peut pas être utilisée actuellement pour mapper les colonnes imbriquées sous des tableaux.
Déf assertErrorThreshold
def assertErrorThreshold : Unit
Action qui oblige le calcul et vérifie que le nombre d'enregistrements d'erreur est inférieur à stageThreshold
et totalThreshold
. Lève une exception si l'une ou l'autre condition échoue.
def count
lazy
def count
Retourne le nombre d'éléments dans le DynamicFrame
.
def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame
avec la colonne spécifiée supprimée.
def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame
avec les colonnes spécifiées supprimées.
Vous pouvez utiliser cette méthode pour supprimer les colonnes imbriquées, y compris celles à l'intérieur de tableaux, mais pas pour supprimer des éléments de tableau spécifiques.
def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
Renvoie un nouveau DynamicFrame
avec toutes les colonnes null supprimées.
Note
Seules les colonnes de type NullType
sont supprimées. Les valeurs null des autres colonnes ne sont pas supprimées ou modifiées.
errorsAsDynamicCadre Def
def errorsAsDynamicFrame
Renvoie un nouveau DynamicFrame
contenant les enregistrements d'erreur de ce DynamicFrame
.
def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Crée un nouveau DynamicFrame
contenant uniquement les enregistrements pour lesquels la fonction 'f
' renvoie la valeur true
. La fonction de filtre 'f
' ne doit pas muter l'enregistrement d'entrée.
def getName
def getName : String
Renvoie le nom du DynamicFrame
.
Déf getNumPartitions
def getNumPartitions
Retourne le nombre de partitions du DynamicFrame
.
Déf getSchemaIf calculé
def getSchemaIfComputed : Option[Schema]
Renvoie le schéma s'il a déjà été calculé. N'analyse pas les données si le schéma n'a pas déjà été calculé.
Déf isSchemaComputed
def isSchemaComputed : Boolean
Renvoie la valeur true
si le schéma a été calculé pour ce DynamicFrame
, ou false
dans le cas contraire. Si la méthode renvoie la valeur false, l'appel de la méthode schema
nécessite un autre passage sur les enregistrements du DynamicFrame
.
Déf javaToPython
def javaToPython : JavaRDD[Array[Byte]]
def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
— Colonnes de ceDynamicFrame
à utiliser pour la jointure.keys2
— colonnes deframe2
à utiliser pour la jointure. Doit être de la même longueur quekeys1
.frame2
— AutreDynamicFrame
à joindre.
Renvoie le résultat de l'exécution d'une équijointure avec frame2
à l'aide des clés spécifiées.
def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame
construit en appliquant la fonction spécifiée 'f
' à chaque enregistrement du DynamicFrame
.
Comme cette méthode copie chaque enregistrement avant d'appliquer la fonction spécifiée, elle est sécurisée pour muter les enregistrements. Si la fonction de mappage lève une exception sur un enregistrement donné, celui-ci est marqué comme erreur, et le suivi de la pile est enregistré en tant que colonne dans l'enregistrement d'erreur.
Déf mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
— trameDynamicFrame
intermédiaire à fusionner.primaryKeys
— liste des champs de clé primaire permettant de faire correspondre les enregistrements des tramesDynamicFrame
source et intermédiaire.transformationContext
— chaîne unique utilisée pour récupérer les métadonnées relatives à la transformation en cours (facultatif).options
— chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires pour cette transformation.callSite
— permet de fournir des informations contextuelles pour le signalement d'erreurs.stageThreshold
– UneLong
. Nombre d'erreurs identifiées dans la transformation donnée et à corriger lors du traitement.totalThreshold
– UneLong
. Nombre total d'erreurs identifiées dans la transformation donnée et qui doivent être corrigées lors du traitement.
Fusionne cette trame DynamicFrame
avec une trame DynamicFrame
intermédiaire basée sur les clés primaires spécifiées pour identifier les enregistrements. Les registres en double (registres avec les mêmes clés primaires) ne sont pas dédupliqués. Si aucun enregistrement ne correspond dans la trame intermédiaire, tous les enregistrements (y compris les doublons) sont conservés dans la source. Si la trame intermédiaire contient des enregistrements correspondants, les enregistrements de la trame intermédiaire remplacent ceux de la source dans AWS Glue.
La trame DynamicFrame
renvoyée contient l'enregistrement A dans les cas suivants :
Si
A
se trouve à la fois dans la trame source et la trame intermédiaire, c'est la valeurA
de la trame intermédiaire qui est renvoyée.Si
A
se trouve dans la table source et siA.primaryKeys
ne se trouve pas dans la tramestagingDynamicFrame
(en d'autres termes,A
n'est pas mis à jour dans la table intermédiaire).
La trame source et la trame intermédiaire n'ont pas besoin d'avoir le même schéma.
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
def printSchema
def printSchema : Unit
Imprime le schéma du DynamicFrame
sur stdout
dans un format compréhensible par les utilisateurs.
def recomputeSchema
def recomputeSchema : Schema
Force le recalcul d'un schéma. Ceci nécessite une analyse des données, mais peut « resserrer » le schéma si certains champs du schéma actuel ne sont pas présents dans les données.
Renvoie le schéma recalculé.
def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
— nom à utiliser pour leDynamicFrame
de base dans la sortie. Les tramesDynamicFrame
qui sont créés par les tableaux pivotants commencent avec ce préfixe.stagingPath
— chemin Amazon Simple Storage Service (Amazon S3) pour écrire des données intermédiaires.options
— crée les relations entre les options et la configuration. Non utilisé actuellement.
Aplanit toutes les structures imbriquées et pivote les tableaux en tables distinctes.
Vous pouvez utiliser cette opération pour préparer les données profondément imbriquées en vue de leur ingestion dans une base de données relationnelle. Les structs imbriquées sont mises à plat de la même manière que la transformation unnest. De plus, les tableaux sont pivotés en tables distinctes et chaque élément du tableau devient une ligne. Par exemple, supposons que vous ayez une trame DynamicFrame
avec les données suivantes.
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
Exécutez le code suivant.
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
Il génère deux tables. La première table est nommée « people » et contient les éléments suivants.
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
Ici, le tableau « friends » a été remplacé par une clé de jointure générée automatiquement. Une table séparée nommée people.friends
est créée avec le contenu suivant.
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
Dans ce tableau, « id
» est une clé de jointure qui identifie de quel enregistrement provenait l'élément de tableau, « index
» fait référence à la position dans le tableau d'origine et « val
» est l'entrée réelle du tableau.
La méthode relationalize
renvoie la séquence de DynamicFrame
créée en appliquant ce processus de façon récursive à tous les tableaux.
Note
La bibliothèque AWS Glue génère automatiquement les clés de jointure des nouvelles tables. Pour vous assurer que les clés de jointure sont uniques au travers des exécutions de tâche, vous devez activer les marque-pages de tâche.
def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
— Nom d'origine de la colonne.newName
— Nouveau nom de la colonne.
Renvoie un nouveau DynamicFrame
contenant le champ spécifié renommé.
Cette méthode peut être utilisée pour renommer les champs imbriqués. Par exemple, le code suivant remplace le nom state
par state_code
dans la structure de l'adresse.
{{{ df.renameField("address.state", "address.state_code") }}}
def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame
avec les partitions numPartitions
.
def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
— Action à appliquer aux colonnesChoiceType
non listées dans la séquence des spécifications.database
— base de données Data Catalog à utiliser avec l'actionmatch_catalog
.tableName
— table Data Catalog à utiliser avec l'actionmatch_catalog
.
Renvoie un nouveau DynamicFrame
en remplaçant un ou plusieurs ChoiceType
avec un type plus spécifique.
Il existe deux façons d'utiliser resolveChoice
. La première consiste à spécifier une séquence de colonnes spécifiques et la façon de les résoudre. Celles-ci sont spécifiés en tant que tuples composés de paires (colonne, action).
Les actions possibles sont les suivantes :
cast:type
— Tente de convertir toutes les valeurs dans le type spécifié.make_cols
– Convertit chaque type distinct en une colonne portant le nomcolumnName_type
.make_struct
— Convertit une colonne en une structure avec des clés pour chaque type distinct.project:type
— Retient uniquement les valeurs du type spécifié.
L'autre mode pour resolveChoice
consiste à spécifier une seule résolution pour tous les ChoiceType
s. Vous pouvez utiliser cela lorsque la liste complète des ChoiceType
s est inconnue avant l'exécution. En plus des actions répertoriées précédemment, ce mode prend également en charge l'action suivante :
match_catalog
ChoiceType
— Tente de convertir chaque dans le type correspondant de la table de catalogue spécifiée.
Exemples :
Résolvez la colonne user.id
en la convertissant en int et faites que le champ address
conserve uniquement les structs.
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
Résolvez tous les objets ChoiceType
en convertissant chaque choix en colonne séparée.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
Résolvez tous les objets ChoiceType
en les convertissant dans les types de la table de catalogue spécifiée.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
def schema
def schema : Schema
Renvoie le schéma du DynamicFrame
.
Le schéma renvoyé est assuré de contenir chaque champ présent dans un enregistrement de ce DynamicFrame
. Mais dans un petit nombre de cas, il peut aussi contenir des champs supplémentaires. La méthode unnest peut être utilisée pour « resserrer » le schéma basé sur les enregistrements du DynamicFrame
.
def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un champ unique comme DynamicFrame
.
def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
— Séquence de noms de colonnes à sélectionner.
Renvoie un nouveau DynamicFrame
contenant les colonnes spécifiées.
Note
La méthode selectFields
peut uniquement être utilisée pour sélectionner les colonnes de niveau supérieur. La méthode applyMapping peut être utilisée pour sélectionner les colonnes imbriquées.
def show
def show( numRows : Int = 20 ) : Unit
numRows
— Nombre de lignes à imprimer.
Imprime les lignes du DynamicFrame
au format JSON.
Def SimplifyDDBison
Les exportations DynamoDB avec le connecteur d'exportation AWS Glue DynamoDB produisent des fichiers JSON contenant des structures imbriquées spécifiques. Pour plus d'informations, consultez la section Objets de données. simplifyDDBJson
Simplifie les colonnes imbriquées dans ce type de données et renvoie une nouvelle colonne simplifiée DynamicFrame. DynamicFrame S'il existe plusieurs types ou si un type de carte est contenu dans un type de liste, les éléments de la liste ne seront pas simplifiés. Cette méthode prend uniquement en charge les données au format JSON d'exportation DynamoDB. Pensez unnest
à effectuer des modifications similaires sur d'autres types de données.
def simplifyDDBJson() : DynamicFrame
Cette méthode ne prend aucun paramètre.
Exemple d'entrée
Tenez compte du schéma suivant généré par une exportation DynamoDB :
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
Exemple de code
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "
ddbTableARN
", "dynamodb.s3.bucket" -> "exportBucketLocation
", "dynamodb.s3.prefix" -> "exportBucketPrefix
", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID
", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
La transformation simplifyDDBJson
simplifie cette exportation ainsi :
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Transmettez la transformation qui renvoie les mêmes enregistrements, mais écrit un sous-ensemble d'enregistrements en tant qu'effet secondaire.
path
— chemin d'accès dans Amazon S3 dans lequel écrire la sortie, sous la formes3://bucket//path
.options
— CarteJsonOptions
facultative décrivant le comportement d'échantillonnage.
Renvoie un DynamicFrame
contenant les mêmes enregistrements que celui-ci.
Par défaut, écrit 100 enregistrements arbitraires à l'emplacement spécifié par path
. Ce comportement peut être personnalisé à l'aide de la carte options
. Les clés valides incluent les suivantes :
topk
— spécifie le nombre total d'enregistrements écrits. La valeur par défaut est 100.prob
— indique la probabilité (sous forme de décimale) qu'un enregistrement individuel soit inclus. La valeur par défaut est 1.
Par exemple, l'appel suivant échantillonne l'ensemble de données en sélectionnant chaque enregistrement avec une probabilité de 20 % et en s'arrêtant après l'écriture de 200 enregistrements.
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
— Chemins à inclure dans le premierDynamicFrame
.
Renvoie une séquence de deux DynamicFrame
s. Le premier DynamicFrame
contient les chemins d'accès spécifiés et le deuxième contient toutes les autres colonnes.
Exemple
Cet exemple prend une persons
table DynamicFrame créée à partir de la legislators
base de données du AWS Glue Data Catalog et la DynamicFrame divise en deux, les champs spécifiés étant placés dans le premier DynamicFrame et les champs restants dans un second DynamicFrame. L'exemple choisit ensuite le premier DynamicFrame parmi le résultat.
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
Fractionne les lignes en fonction des prédicats qui comparent les colonnes aux constantes.
paths
— Colonnes à utiliser pour la comparaison.values
— Valeurs constantes à utiliser pour la comparaison.operators
— Opérateurs à utiliser pour la comparaison.
Renvoie une séquence de deux DynamicFrame
s. Le premier contient les lignes pour lesquelles le prédicat a la valeur true et le deuxième contient celles pour lesquelles la valeur est false.
Les prédicats sont spécifiés en utilisant trois séquences : « paths
» contient les noms de colonne (possiblement imbriqués), « values
» contient les valeurs constantes de comparaison et « operators
» contient les opérateurs à utiliser pour la comparaison. Les trois séquences doivent être de la même longueur : le n
ème opérateur est utilisé pour comparer la n
ème colonne à la n
ème valeur.
Chaque opérateur doit être « !=
», « =
», « <=
», « <
», « >=
» ou « >
».
Par exemple, l'appel suivant scinde une trame DynamicFrame
afin que la première trame de sortie contienne les enregistrements des personnes de plus de 65 ans des États-Unis et que la deuxième contienne tous les autres enregistrements.
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
Déf stageErrorsCount
def stageErrorsCount
Renvoie le nombre d'enregistrements d'erreur créés pendant le calcul du DynamicFrame
. Cela exclut les erreurs des opérations précédentes qui ont été transmises au DynamicFrame
comme entrée.
def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
Convertit le DynamicFrame
en un DataFrame
Apache Spark SQL avec le même schéma et les mêmes enregistrements.
Note
Étant donné que les DataFrame
s ne prennent pas en charge les ChoiceType
s, cette méthode convertit automatiquement les colonnes ChoiceType
en StructType
s. Pour plus d'informations et pour connaître les options de résolution des choix, consultez resolveChoice.
def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
— colonne à analyser. Doit être de type string (chaîne) ou binary (binaire).format
— Le format à utiliser pour l'analyse.optionString
— Options à transmettre au format, telles que le séparateur CSV.
Analyse une chaîne ou une colonne binaire intégrée selon le format spécifié. Les colonnes analysées sont imbriquées sous une structure avec le nom de colonne d'origine.
Par exemple, supposons que vous ayez un fichier CSV avec une colonne JSON imbriquée.
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
Après une analyse initiale, vous obtenez une trame DynamicFrame
avec le schéma suivant.
{{{ root |-- name: string |-- age: int |-- address: string }}}
Vous pouvez appeler unbox
au niveau de la colonne « address » pour analyser les composants spécifiques.
{{{ df.unbox("address", "json") }}}
Vous obtenez ainsi une trame DynamicFrame
avec le schéma suivant.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame
avec tous les structures imbriquées mises à plat. Les noms sont construits à l'aide du caractère « .
» (point).
Par exemple, supposons que vous ayez une trame DynamicFrame
avec le schéma suivant.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
L'appel suivant désimbrique la struct « address ».
{{{ df.unnest() }}}
Le schéma obtenu est le suivant.
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
Cette méthode désimbrique également les structs imbriquées à l'intérieur des tableaux. Mais pour des raisons historiques, le nom de ces champs est précédé par le nom du tableau englobant et par « .val
».
def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
Supprime l'imbrication des colonnes imbriquées dans un DynamicFrame
qui se trouvent spécifiquement dans la structure JSON DynamoDB, et renvoie une nouvelle version non imbriquée DynamicFrame
. Les colonnes d'un tableau de types de structure ne seront pas non-imbriquées. Notez qu'il s'agit d'un type spécifique de transformation non imbriquée qui se comporte différemment de la transformation unnest
normale et nécessite que les données soient déjà dans la structure JSON DynamoDB. Pour plus d'informations, consultez JSON DynamoDB.
Par exemple, le schéma d'une lecture d'exportation avec la structure JSON DynamoDB pourrait ressembler à ce qui suit :
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
La transformation unnestDDBJson()
convertirait ceci en :
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
L'exemple de code suivant montre comment utiliser le connecteur d'exportation DynamoDB AWS Glue, recourir à la suppression de l'imbrication de JSON DynamoDB, puis imprimer le nombre de partitions :
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
Déf withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
— fonction qui renvoie le schéma à utiliser. Spécifiée en tant que fonction de paramètre zéro pour reporter un calcul potentiellement onéreux.
Définit le schéma du DynamicFrame
sur la valeur spécifiée. La fonction est principalement utilisée en interne pour éviter un recalcul du schéma coûteux. Le schéma transmis doit contenir toutes les colonnes présentes dans les données.
def withName
def withName( name : String ) : DynamicFrame
name
— Nouveau nom à utiliser.
Renvoie une copie du DynamicFrame
avec un nouveau nom.
Déf withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
Renvoie une copie du DynamicFrame
avec le contexte de transformation spécifié.