AWS GlueClasse Scala DynamicFrame - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

AWS GlueClasse Scala DynamicFrame

Package : com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

Un DynamicFrame est une collection distribuée d'objets DynamicRecord à description automatique.

Le DynamicFrame est conçu pour fournir un modèle de données flexible pour les opérations ETL (extraction, transformation et chargement). Il ne nécessite pas de créer un schéma et peut être utilisé pour lire et transformer les données contenant des valeurs et des types incohérents ou complexes. Un schéma peut être calculé à la demande pour les opérations qui en nécessitent un.

Les DynamicFrame fournissent une plage de transformations pour le nettoyage des données et ETL. Ils prennent également en charge la conversion vers et depuis DataFrames SparkSQL afin de s'intégrer au code existant et aux nombreuses opérations d'analyse qui en découlent. DataFrames

Les paramètres suivants sont partagés entre plusieurs transformations AWS Glue qui construisent les DynamicFrames :

  • transformationContext – Identificateur pour ce DynamicFrame. Le transformationContext est utilisé en tant que clé pour l'état de marque-page de tâche conservé d'une exécution à l'autre.

  • callSite — fournit les informations de contexte pour le signalement d'erreurs. Ces valeurs sont automatiquement définies lors de l'appel à partir de Python.

  • stageThreshold — Nombre maximal d'enregistrements d'erreurs autorisés depuis le calcul du DynamicFrame avant de lever une exception, à l'exclusion des enregistrements présents dans le DynamicFrame précédent.

  • totalThreshold — Nombre maximal d'enregistrements d'erreur avant qu'une exception ne soit levée, y compris ceux des images précédentes.

val errorsCount

val errorsCount

Nombre d'enregistrements d'erreur dans le DynamicFrame. Les erreurs des opérations précédentes sont incluses dans le nombre.

def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings – Séquence des mappages pour construire un nouveau DynamicFrame.

  • caseSensitive — indique si les colonnes sources sont considérées comme sensibles à la casse. L'attribution de la valeur false à ce paramètre peut aider lors de l'intégration de magasins insensibles à la casse comme AWS Glue Data Catalog.

Sélectionne, projette et convertit les colonnes en fonction d'une séquence de mappages.

Chaque mappage se compose d'une colonne source et d'un type, ainsi que d'une colonne cible et d'un type. Les mappages peuvent être spécifiés sous forme de quatre tuples (source_path, source_type, target_path, target_type) ou d'un objet MappingSpec contenant les mêmes informations.

En plus des projections simples et de la conversion, les mappages peuvent être utilisés pour imbriquer ou désimbriquer les champs en séparant les composants du chemin d'accès par « . » (point).

Par exemple, supposons que vous ayez une trame DynamicFrame avec le schéma suivant.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

Vous pouvez effectuer l'appel suivant pour désimbriquer les champs state et zip.

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

Le schéma obtenu est le suivant.

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

Vous pouvez également utiliser applyMapping pour réimbriquer les colonnes. Par exemple, ce qui suit inverse la transformation précédente et crée une structure nommée address dans la cible.

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

Les noms de champ qui contiennent des caractères « . » (point) peuvent être placés entre guillemets (``).

Note

La méthode applyMapping ne peut pas être utilisée actuellement pour mapper les colonnes imbriquées sous des tableaux.

Déf assertErrorThreshold

def assertErrorThreshold : Unit

Action qui oblige le calcul et vérifie que le nombre d'enregistrements d'erreur est inférieur à stageThreshold et totalThreshold. Lève une exception si l'une ou l'autre condition échoue.

def count

lazy def count

Retourne le nombre d'éléments dans le DynamicFrame.

def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Renvoie un nouveau DynamicFrame avec la colonne spécifiée supprimée.

def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Renvoie un nouveau DynamicFrame avec les colonnes spécifiées supprimées.

Vous pouvez utiliser cette méthode pour supprimer les colonnes imbriquées, y compris celles à l'intérieur de tableaux, mais pas pour supprimer des éléments de tableau spécifiques.

def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

Renvoie un nouveau DynamicFrame avec toutes les colonnes null supprimées.

Note

Seules les colonnes de type NullType sont supprimées. Les valeurs null des autres colonnes ne sont pas supprimées ou modifiées.

errorsAsDynamicCadre Def

def errorsAsDynamicFrame

Renvoie un nouveau DynamicFramecontenant les enregistrements d'erreur de ce DynamicFrame.

def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Crée un nouveau DynamicFrame contenant uniquement les enregistrements pour lesquels la fonction 'f' renvoie la valeur true. La fonction de filtre 'f' ne doit pas muter l'enregistrement d'entrée.

def getName

def getName : String

Renvoie le nom du DynamicFrame.

Déf getNumPartitions

def getNumPartitions

Retourne le nombre de partitions du DynamicFrame.

Déf getSchemaIf calculé

def getSchemaIfComputed : Option[Schema]

Renvoie le schéma s'il a déjà été calculé. N'analyse pas les données si le schéma n'a pas déjà été calculé.

Déf isSchemaComputed

def isSchemaComputed : Boolean

Renvoie la valeur true si le schéma a été calculé pour ce DynamicFrame, ou false dans le cas contraire. Si la méthode renvoie la valeur false, l'appel de la méthode schema nécessite un autre passage sur les enregistrements du DynamicFrame.

Déf javaToPython

def javaToPython : JavaRDD[Array[Byte]]

def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 — Colonnes de ce DynamicFrame à utiliser pour la jointure.

  • keys2 — colonnes de frame2 à utiliser pour la jointure. Doit être de la même longueur que keys1.

  • frame2 — Autre DynamicFrame à joindre.

Renvoie le résultat de l'exécution d'une équijointure avec frame2 à l'aide des clés spécifiées.

def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Renvoie un nouveau DynamicFrame construit en appliquant la fonction spécifiée 'f' à chaque enregistrement du DynamicFrame.

Comme cette méthode copie chaque enregistrement avant d'appliquer la fonction spécifiée, elle est sécurisée pour muter les enregistrements. Si la fonction de mappage lève une exception sur un enregistrement donné, celui-ci est marqué comme erreur, et le suivi de la pile est enregistré en tant que colonne dans l'enregistrement d'erreur.

Déf mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame — trame DynamicFrame intermédiaire à fusionner.

  • primaryKeys — liste des champs de clé primaire permettant de faire correspondre les enregistrements des trames DynamicFrame source et intermédiaire.

  • transformationContext — chaîne unique utilisée pour récupérer les métadonnées relatives à la transformation en cours (facultatif).

  • options — chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires pour cette transformation.

  • callSite — permet de fournir des informations contextuelles pour le signalement d'erreurs.

  • stageThreshold – Une Long. Nombre d'erreurs identifiées dans la transformation donnée et à corriger lors du traitement.

  • totalThreshold – Une Long. Nombre total d'erreurs identifiées dans la transformation donnée et qui doivent être corrigées lors du traitement.

Fusionne cette trame DynamicFrame avec une trame DynamicFrame intermédiaire basée sur les clés primaires spécifiées pour identifier les enregistrements. Les registres en double (registres avec les mêmes clés primaires) ne sont pas dédupliqués. Si aucun enregistrement ne correspond dans la trame intermédiaire, tous les enregistrements (y compris les doublons) sont conservés dans la source. Si la trame intermédiaire contient des enregistrements correspondants, les enregistrements de la trame intermédiaire remplacent ceux de la source dans AWS Glue.

La trame DynamicFrame renvoyée contient l'enregistrement A dans les cas suivants :

  1. Si A se trouve à la fois dans la trame source et la trame intermédiaire, c'est la valeur A de la trame intermédiaire qui est renvoyée.

  2. Si A se trouve dans la table source et si A.primaryKeys ne se trouve pas dans la trame stagingDynamicFrame (en d'autres termes, A n'est pas mis à jour dans la table intermédiaire).

La trame source et la trame intermédiaire n'ont pas besoin d'avoir le même schéma.

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

def printSchema

def printSchema : Unit

Imprime le schéma du DynamicFrame sur stdout dans un format compréhensible par les utilisateurs.

def recomputeSchema

def recomputeSchema : Schema

Force le recalcul d'un schéma. Ceci nécessite une analyse des données, mais peut « resserrer » le schéma si certains champs du schéma actuel ne sont pas présents dans les données.

Renvoie le schéma recalculé.

def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName — nom à utiliser pour le DynamicFrame de base dans la sortie. Les trames DynamicFrame qui sont créés par les tableaux pivotants commencent avec ce préfixe.

  • stagingPath — chemin Amazon Simple Storage Service (Amazon S3) pour écrire des données intermédiaires.

  • options — crée les relations entre les options et la configuration. Non utilisé actuellement.

Aplanit toutes les structures imbriquées et pivote les tableaux en tables distinctes.

Vous pouvez utiliser cette opération pour préparer les données profondément imbriquées en vue de leur ingestion dans une base de données relationnelle. Les structs imbriquées sont mises à plat de la même manière que la transformation unnest. De plus, les tableaux sont pivotés en tables distinctes et chaque élément du tableau devient une ligne. Par exemple, supposons que vous ayez une trame DynamicFrame avec les données suivantes.

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

Exécutez le code suivant.

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

Il génère deux tables. La première table est nommée « people » et contient les éléments suivants.

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

Ici, le tableau « friends » a été remplacé par une clé de jointure générée automatiquement. Une table séparée nommée people.friends est créée avec le contenu suivant.

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

Dans ce tableau, « id » est une clé de jointure qui identifie de quel enregistrement provenait l'élément de tableau, « index » fait référence à la position dans le tableau d'origine et « val » est l'entrée réelle du tableau.

La méthode relationalize renvoie la séquence de DynamicFrame créée en appliquant ce processus de façon récursive à tous les tableaux.

Note

La bibliothèque AWS Glue génère automatiquement les clés de jointure des nouvelles tables. Pour vous assurer que les clés de jointure sont uniques au travers des exécutions de tâche, vous devez activer les marque-pages de tâche.

def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName — Nom d'origine de la colonne.

  • newName — Nouveau nom de la colonne.

Renvoie un nouveau DynamicFrame contenant le champ spécifié renommé.

Cette méthode peut être utilisée pour renommer les champs imbriqués. Par exemple, le code suivant remplace le nom state par state_code dans la structure de l'adresse.

{{{ df.renameField("address.state", "address.state_code") }}}

def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Renvoie un nouveau DynamicFrame avec les partitions numPartitions.

def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption — Action à appliquer aux colonnes ChoiceType non listées dans la séquence des spécifications.

  • database — base de données Data Catalog à utiliser avec l'action match_catalog.

  • tableName — table Data Catalog à utiliser avec l'action match_catalog.

Renvoie un nouveau DynamicFrame en remplaçant un ou plusieurs ChoiceType avec un type plus spécifique.

Il existe deux façons d'utiliser resolveChoice. La première consiste à spécifier une séquence de colonnes spécifiques et la façon de les résoudre. Celles-ci sont spécifiés en tant que tuples composés de paires (colonne, action).

Les actions possibles sont les suivantes :

  • cast:type — Tente de convertir toutes les valeurs dans le type spécifié.

  • make_cols – Convertit chaque type distinct en une colonne portant le nom columnName_type.

  • make_struct — Convertit une colonne en une structure avec des clés pour chaque type distinct.

  • project:type — Retient uniquement les valeurs du type spécifié.

L'autre mode pour resolveChoice consiste à spécifier une seule résolution pour tous les ChoiceTypes. Vous pouvez utiliser cela lorsque la liste complète des ChoiceTypes est inconnue avant l'exécution. En plus des actions répertoriées précédemment, ce mode prend également en charge l'action suivante :

  • match_catalogChoiceType — Tente de convertir chaque dans le type correspondant de la table de catalogue spécifiée.

Exemples :

Résolvez la colonne user.id en la convertissant en int et faites que le champ address conserve uniquement les structs.

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

Résolvez tous les objets ChoiceType en convertissant chaque choix en colonne séparée.

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

Résolvez tous les objets ChoiceType en les convertissant dans les types de la table de catalogue spécifiée.

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

def schema

def schema : Schema

Renvoie le schéma du DynamicFrame.

Le schéma renvoyé est assuré de contenir chaque champ présent dans un enregistrement de ce DynamicFrame. Mais dans un petit nombre de cas, il peut aussi contenir des champs supplémentaires. La méthode unnest peut être utilisée pour « resserrer » le schéma basé sur les enregistrements du DynamicFrame.

def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Renvoie un champ unique comme DynamicFrame.

def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths — Séquence de noms de colonnes à sélectionner.

Renvoie un nouveau DynamicFrame contenant les colonnes spécifiées.

Note

La méthode selectFields peut uniquement être utilisée pour sélectionner les colonnes de niveau supérieur. La méthode applyMapping peut être utilisée pour sélectionner les colonnes imbriquées.

def show

def show( numRows : Int = 20 ) : Unit
  • numRows — Nombre de lignes à imprimer.

Imprime les lignes du DynamicFrame au format JSON.

Def SimplifyDDBison

Les exportations DynamoDB avec le connecteur d'exportation AWS Glue DynamoDB produisent des fichiers JSON contenant des structures imbriquées spécifiques. Pour plus d'informations, consultez la section Objets de données. simplifyDDBJson Simplifie les colonnes imbriquées dans ce type de données et renvoie une nouvelle colonne simplifiée DynamicFrame. DynamicFrame S'il existe plusieurs types ou si un type de carte est contenu dans un type de liste, les éléments de la liste ne seront pas simplifiés. Cette méthode prend uniquement en charge les données au format JSON d'exportation DynamoDB. Pensez unnest à effectuer des modifications similaires sur d'autres types de données.

def simplifyDDBJson() : DynamicFrame

Cette méthode ne prend aucun paramètre.

Exemple d'entrée

Tenez compte du schéma suivant généré par une exportation DynamoDB :

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

Exemple de code

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

La transformation simplifyDDBJson simplifie cette exportation ainsi :

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Transmettez la transformation qui renvoie les mêmes enregistrements, mais écrit un sous-ensemble d'enregistrements en tant qu'effet secondaire.

  • path — chemin d'accès dans Amazon S3 dans lequel écrire la sortie, sous la forme s3://bucket//path.

  • options — Carte JsonOptions facultative décrivant le comportement d'échantillonnage.

Renvoie un DynamicFrame contenant les mêmes enregistrements que celui-ci.

Par défaut, écrit 100 enregistrements arbitraires à l'emplacement spécifié par path. Ce comportement peut être personnalisé à l'aide de la carte options. Les clés valides incluent les suivantes :

  • topk — spécifie le nombre total d'enregistrements écrits. La valeur par défaut est 100.

  • prob — indique la probabilité (sous forme de décimale) qu'un enregistrement individuel soit inclus. La valeur par défaut est 1.

Par exemple, l'appel suivant échantillonne l'ensemble de données en sélectionnant chaque enregistrement avec une probabilité de 20 % et en s'arrêtant après l'écriture de 200 enregistrements.

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths — Chemins à inclure dans le premier DynamicFrame.

Renvoie une séquence de deux DynamicFrames. Le premier DynamicFramecontient les chemins d'accès spécifiés et le deuxième contient toutes les autres colonnes.

Exemple

Cet exemple prend une persons table DynamicFrame créée à partir de la legislators base de données du AWS Glue Data Catalog et la DynamicFrame divise en deux, les champs spécifiés étant placés dans le premier DynamicFrame et les champs restants dans un second DynamicFrame. L'exemple choisit ensuite le premier DynamicFrame parmi le résultat.

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

Fractionne les lignes en fonction des prédicats qui comparent les colonnes aux constantes.

  • paths — Colonnes à utiliser pour la comparaison.

  • values — Valeurs constantes à utiliser pour la comparaison.

  • operators — Opérateurs à utiliser pour la comparaison.

Renvoie une séquence de deux DynamicFrames. Le premier contient les lignes pour lesquelles le prédicat a la valeur true et le deuxième contient celles pour lesquelles la valeur est false.

Les prédicats sont spécifiés en utilisant trois séquences : « paths » contient les noms de colonne (possiblement imbriqués), « values » contient les valeurs constantes de comparaison et « operators » contient les opérateurs à utiliser pour la comparaison. Les trois séquences doivent être de la même longueur : le nème opérateur est utilisé pour comparer la nème colonne à la nème valeur.

Chaque opérateur doit être « != », « = », « <= », « < », « >= » ou « > ».

Par exemple, l'appel suivant scinde une trame DynamicFrame afin que la première trame de sortie contienne les enregistrements des personnes de plus de 65 ans des États-Unis et que la deuxième contienne tous les autres enregistrements.

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

Déf stageErrorsCount

def stageErrorsCount

Renvoie le nombre d'enregistrements d'erreur créés pendant le calcul du DynamicFrame. Cela exclut les erreurs des opérations précédentes qui ont été transmises au DynamicFrame comme entrée.

def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

Convertit le DynamicFrame en un DataFrame Apache Spark SQL avec le même schéma et les mêmes enregistrements.

Note

Étant donné que les DataFrames ne prennent pas en charge les ChoiceTypes, cette méthode convertit automatiquement les colonnes ChoiceType en StructTypes. Pour plus d'informations et pour connaître les options de résolution des choix, consultez resolveChoice.

def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path — colonne à analyser. Doit être de type string (chaîne) ou binary (binaire).

  • format — Le format à utiliser pour l'analyse.

  • optionString — Options à transmettre au format, telles que le séparateur CSV.

Analyse une chaîne ou une colonne binaire intégrée selon le format spécifié. Les colonnes analysées sont imbriquées sous une structure avec le nom de colonne d'origine.

Par exemple, supposons que vous ayez un fichier CSV avec une colonne JSON imbriquée.

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

Après une analyse initiale, vous obtenez une trame DynamicFrame avec le schéma suivant.

{{{ root |-- name: string |-- age: int |-- address: string }}}

Vous pouvez appeler unbox au niveau de la colonne « address » pour analyser les composants spécifiques.

{{{ df.unbox("address", "json") }}}

Vous obtenez ainsi une trame DynamicFrame avec le schéma suivant.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Renvoie un nouveau DynamicFrame avec tous les structures imbriquées mises à plat. Les noms sont construits à l'aide du caractère « . » (point).

Par exemple, supposons que vous ayez une trame DynamicFrame avec le schéma suivant.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

L'appel suivant désimbrique la struct « address ».

{{{ df.unnest() }}}

Le schéma obtenu est le suivant.

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

Cette méthode désimbrique également les structs imbriquées à l'intérieur des tableaux. Mais pour des raisons historiques, le nom de ces champs est précédé par le nom du tableau englobant et par « .val ».

def unnestDDBJson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

Supprime l'imbrication des colonnes imbriquées dans un DynamicFrame qui se trouvent spécifiquement dans la structure JSON DynamoDB, et renvoie une nouvelle version non imbriquée DynamicFrame. Les colonnes d'un tableau de types de structure ne seront pas non-imbriquées. Notez qu'il s'agit d'un type spécifique de transformation non imbriquée qui se comporte différemment de la transformation unnest normale et nécessite que les données soient déjà dans la structure JSON DynamoDB. Pour plus d'informations, consultez JSON DynamoDB.

Par exemple, le schéma d'une lecture d'exportation avec la structure JSON DynamoDB pourrait ressembler à ce qui suit :

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

La transformation unnestDDBJson() convertirait ceci en :

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

L'exemple de code suivant montre comment utiliser le connecteur d'exportation DynamoDB AWS Glue, recourir à la suppression de l'imbrication de JSON DynamoDB, puis imprimer le nombre de partitions :

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

Déf withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema — fonction qui renvoie le schéma à utiliser. Spécifiée en tant que fonction de paramètre zéro pour reporter un calcul potentiellement onéreux.

Définit le schéma du DynamicFrame sur la valeur spécifiée. La fonction est principalement utilisée en interne pour éviter un recalcul du schéma coûteux. Le schéma transmis doit contenir toutes les colonnes présentes dans les données.

def withName

def withName( name : String ) : DynamicFrame
  • name — Nouveau nom à utiliser.

Renvoie une copie du DynamicFrame avec un nouveau nom.

Déf withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

Renvoie une copie du DynamicFrame avec le contexte de transformation spécifié.