DynamicFrame classe - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

DynamicFrame classe

L'une des principales abstractions d'Apache Spark est le Spark SQLDataFrame, qui est similaire à la DataFrame construction trouvée dans R et Pandas. A DataFrame est similaire à une table et prend en charge les opérations de style fonctionnel (carte/réduire/filtre/etc.) et SQL les opérations (sélection, projet, agrégation).

DataFramessont puissants et largement utilisés, mais ils présentent des limites en ce qui concerne les opérations d'extraction, de transformation et de chargement (ETL). Cela signifie notamment qu'un schéma doit être spécifié avant tout chargement de données. Spark SQL résout ce problème en effectuant deux passages sur les données : le premier pour déduire le schéma et le second pour charger les données. Cependant, cette inférence est limitée et ne prend pas en compte la désorganisation des données. Par exemple, un même champ peut avoir différents types dans différents enregistrements. L'opération est souvent abandonnée par Apache Spark, qui indique que le type est string en tenant compte du texte du champ d'origine. Cette information peut être erronée et vous pouvez souhaiter exercer un meilleur contrôle sur la résolution des écarts entre les schémas. Et pour les ensembles de données volumineux, un passage supplémentaire sur les données source peut s'avérer hors de prix.

Pour remédier à ces limites, AWS Glue introduit leDynamicFrame. Un DynamicFrame est similaire à un DataFrame, mais chaque enregistrement y est auto-descriptif : initialement, aucun schéma n'est donc requis. Au lieu de cela, AWS Glue calcule un schéma on-the-fly lorsque cela est nécessaire et code explicitement les incohérences du schéma à l'aide d'un type de choix (ou d'union). Vous pouvez résoudre ces incohérences afin de rendre vos ensembles de données compatibles avec les magasins de données qui nécessitent un schéma fixe.

De même, un DynamicRecord représente un enregistrement logique au sein dans un DynamicFrame. Il est comparable à une ligne dans un DataFrame Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe. Lorsque vous utilisez AWS Glue with PySpark, vous ne manipulez généralement pas de manière indépendanteDynamicRecords. Vous allez plutôt transformer le jeu de données par le biais de son DynamicFrame.

Vous pouvez convertir DynamicFrames vers et depuis DataFrames après la résolution des incohérences de schémas.

 – construction –

__init__

__init__(jdf, glue_ctx, name)
  • jdf— Une référence au bloc de données dans la machine virtuelle Java (JVM).

  • glue_ctx – Un objet GlueContext classe.

  • name – Nom de chaîne facultatif, vide par défaut.

fromDF

fromDF(dataframe, glue_ctx, name)

Convertit un DataFrame en DynamicFrame via la conversion de champs DataFrame en champs DynamicRecord. Renvoie un nouveau DynamicFrame.

Un DynamicRecord représente un enregistrement logique dans un DynamicFrame. Il est comparable à une ligne dans un DataFrame Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe.

Cette fonction s'attend à ce que les colonnes dont les noms sont dupliqués dans votre DataFrame aient déjà été résolues.

  • dataframe— L'Apache Spark SQL DataFrame à convertir (obligatoire).

  • glue_ctx – Objet GlueContext classe qui spécifie le contexte pour cette transformation (obligatoire).

  • name— Le nom du résultat DynamicFrame (facultatif depuis AWS Glue 3.0).

toDF

toDF(options)

Convertit un DynamicFrame en DataFrame Apache Spark via la conversion de DynamicRecords en champs DataFrame. Renvoie un nouveau DataFrame.

Un DynamicRecord représente un enregistrement logique dans un DynamicFrame. Il est comparable à une ligne dans un DataFrame Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe.

  • options – liste d'options. Vous permet de définir des options supplémentaires pour le processus de conversion. Voici quelques options valides que vous pouvez utiliser avec le paramètre « options » :

    • format— spécifie le format des données, tel que json, csv, parquet).

    • separater or sep— pour les CSV fichiers, indique le délimiteur.

    • header— pour CSV les fichiers, indique si la première ligne est un en-tête (vrai/faux).

    • inferSchema— demande à Spark de déduire automatiquement le schéma (vrai/faux).

    Voici un exemple d'utilisation du paramètre `options` avec la méthode `TODF` :

    from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) csv_dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://my-bucket/path/to/csv/"]}, format="csv" ) csv_cf = csv_dyf.toDF(options={ "separator": ",", "header": "true", "ïnferSchema": "true" })

    Spécifiez le type de cible si vous choisissez le type d'action Project et Cast. Voici quelques exemples :

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 – informations –

count

count( ) – Renvoie le nombre de lignes dans le DataFrame sous-jacent.

schéma

schema( ) – Renvoie le schéma de ce DynamicFrame ou, s'il n'est pas disponible, le schéma du DataFrame sous-jacent.

Pour plus d'informations sur les types de DynamicFrame qui composent ce schéma, consultez Types d'extension PySpark.

printSchema

printSchema( ) – Imprime le schéma du DataFrame sous-jacent.

show

show(num_rows) – Imprime un nombre de lignes spécifié du DataFrame sous-jacent.

repartition

repartition(numPartitions) – renvoie un nouvel objet DynamicFrame avec des partitions numPartitions.

coalesce

coalesce(numPartitions) – renvoie un nouvel objet DynamicFrame avec des partitions numPartitions.

 – transformations –

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Applique un mappage déclaratif à une image DynamicFrame et renvoie une nouvelle image DynamicFrame sur laquelle ces mappages sont appliqués. Les champs non spécifiés sont omis dans le nouveauDynamicFrame.

  • mappings— Une liste de tuples de mappage (obligatoire). Chacun étant composé comme suit : (colonne source, type source, colonne cible, type cible).

    Si la colonne source comporte un point « . » dans le nom, vous devez l'entourer d'accents graves « `` ». Par exemple, pour mapper this.old.name (chaîne) à thisNewName, vous devez utiliser le tuple suivant :

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple : Utilisez apply_mapping afin de renommer des champs ainsi que de modifier ses types

L'exemple de code suivant montre la apply_mappingméthode d’utilisation relative au renommage des champs sélectionnés ainsi qu’ à la modification des types de champs.

Note

Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Appelle la transformation Classe FlatMap afin de supprimer des champs d'un DynamicFrame. Renvoie un nouveau DynamicFrame avec les champs supprimés spécifiés.

  • paths - Une liste de chaînes. Chacune contient le chemin d'accès complet à un nœud de champ que vous souhaitez supprimer. Vous pouvez utiliser la notation par points pour spécifier des champs imbriqués. Par exemple, si le champfirst est un enfant du champ namedans l'arborescence, vous spécifiez"name.first"pour le chemin d'accès.

    Si le nom d'un nœud de champ contient un . littéral, vous devez l'entourer d'accents graves (`).

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple : utilisez supprimer les chams pour supprimer les champs d'unDynamicFrame

Cet exemple de code utilise ledrop_fieldsméthode pour supprimer les champs de niveau supérieur et imbriqués sélectionnés d'unDynamicFrame.

Exemple de jeu de données

L'exemple utilise l'ensemble de données suivant qui est représenté parEXAMPLE-FRIENDS-DATAtable dans le code :

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

Exemple de code

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

filtre

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Renvoie un nouveau DynamicFrame contien tout DynamicRecords de l'entrée DynamicFrame qui correspondent à la fonction de prédicat f.

  • f – Fonction de prédicat à appliquer au paramètre DynamicFrame. La fonction doit prendre un DynamicRecord en tant qu'argument et renvoyer la valeur True si le DynamicRecord répond aux critères du filtre, ou la valeur False si ce n'est pas le cas (obligatoire).

    Un DynamicRecord représente un enregistrement logique dans un DynamicFrame. Il est comparable à une ligne dans un Spark DataFrame, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple  : Utiliser un filtre pour obtenir une sélection de champs filtrée

Cet exemple utilise lefilter Méthode de création d'un nouveauDynamicFramequi inclut une sélection filtrée d'un autre DynamicFrame des champs.

Comme le map Méthode, filter prend une fonction comme argument qui est appliquée à chaque enregistrement de l'originalDynamicFrame. La fonction prend un enregistrement en entrée et renvoie une valeur booléenne. Si la valeur de retour est vraie, l'enregistrement est inclus dans le résultatDynamicFrame. Si c'est faux, le dossier est omis.

Note

Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : préparation des données à l'aide ResolveChoice de Lambda et ApplyMapping et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Effectue une jointure d'égalité avec un autre DynamicFrame et renvoie le DynamicFrame obtenu.

  • paths1 – Liste des clés dans cette trame à joindre.

  • paths2 – Liste des clés dans l'autre trame à joindre.

  • frame2 – Autre DynamicFrame à joindre.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple : Utiliser la jointure pour combinerDynamicFrames

Cet exemple utilise la join méthode pour effectuer une jointure sur troisDynamicFrames. AWS Glue effectue la jointure en fonction des clés de champ que vous fournissez. Le résultat DynamicFrame contient les lignes des deux cadres d'origine où les clés spécifiées correspondent.

Notez que lejointransform conserve tous les champs intacts. Cela signifie que les champs que vous spécifiez comme correspondants apparaissent dans le résultat DynamicFrame, même s'ils sont redondants et contiennent les mêmes clés. Dans cet exemple, nous utilisonsdrop_fieldspour supprimer ces clés redondantes après la jointure.

Note

Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Renvoie un nouveau DynamicFrame qui résulte de l'application de la fonction de mappage spécifiée à tous les enregistrements dans le DynamicFrame d'origine.

  • f – Fonction de mappage à appliquer à tous les enregistrements dans le paramètre DynamicFrame. La fonction doit prendre un DynamicRecord comme argument et renvoyer un nouveau DynamicRecord (obligatoire).

    Un DynamicRecord représente un enregistrement logique dans un DynamicFrame. Il est comparable à une ligne dans un DataFrame Apache Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne associée à des erreurs dans la transformation (facultatif).

  • stageThreshold – nombre maximal d'erreurs qui peuvent avoir lieu dans la transformation avant qu'elle ne soit arrêtée.(facultatif) La valeur par défaut est zéro.

  • totalThreshold – nombre maximal d'erreurs pouvant se produire globalement avant que le processus de traitement des erreurs ne soit arrêté.(facultatif) La valeur par défaut est zéro.

Exemple : utilisez la carte pour appliquer une fonction à chaque enregistrement dans un DynamicFrame

Cet exemple illustre comment utiliser map méthode pour appliquer une fonction à chaque enregistrement d'un DynamicFrame. Plus précisément, cet exemple applique une fonction appelée MergeAddress à chaque enregistrement afin de fusionner plusieurs champs d'adresse en un seul struct type.

Note

Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : préparation des données à l'aide ResolveChoice de Lambda et ApplyMapping et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

Fusionne cette trame DynamicFrame avec une trame DynamicFrame intermédiaire basée sur les clés primaires spécifiées pour identifier les enregistrements. Les registres en double (registres avec les mêmes clés primaires) ne sont pas dédupliqués. Si aucun enregistrement ne correspond dans la trame intermédiaire, tous les enregistrements (y compris les doublons) sont conservés dans la source. Si la trame intermédiaire contient des enregistrements correspondants, les enregistrements de la trame intermédiaire remplacent les enregistrements de la source dans AWS Glue.

  • stage_dynamic_frame – trame DynamicFrame intermédiaire à fusionner.

  • primary_keys – liste des champs de clé primaire permettant de faire correspondre les enregistrements des trames dynamiques source et intermédiaire.

  • transformation_ctx – chaîne unique utilisée pour récupérer les métadonnées relatives à la transformation en cours (facultatif).

  • options— Chaîne de paires JSON nom-valeur fournissant des informations supplémentaires pour cette transformation. Cet argument n'est actuellement pas utilisé.

  • info – A String. Toute chaîne à associer à des erreurs dans cette transformation.

  • stageThreshold – A Long. Nombre d'erreurs identifiées dans la transformation donnée et à corriger lors du traitement.

  • totalThreshold – A Long. Nombre total d'erreurs identifiées dans la transformation donnée et qui doivent être corrigées lors du traitement.

La méthode renvoie une nouvelle image DynamicFrame obtenue en fusionnant DynamicFrame avec l'image DynamicFrame intermédiaire.

La trame DynamicFrame renvoyée contient l'enregistrement A dans les cas suivants :

  • Si A se trouve à la fois dans la trame source et la trame intermédiaire, c'est la valeur A de la trame intermédiaire qui est renvoyée.

  • Si A se trouve dans la table source et si A.primaryKeys ne se trouve pas dans l'image stagingDynamicFrame, A n'est pas mis à jour dans la table intermédiaire.

L'image source et l'image intermédiaire n'ont pas besoin d'avoir le même schéma.

Exemple : mergeDynamicFrame à utiliser pour fusionner deux DynamicFrames en fonction d'une clé primaire

L'exemple de code suivant montre comment utiliser la méthode mergeDynamicFrame pour fusionner une image DynamicFrame avec une image DynamicFrame « intermédiaire », sur la base de la clé primaire id.

Exemple de jeu de données

L'exemple utilise deux images DynamicFrames d'une collection DynamicFrameCollection appelée split_rows_collection. Vous trouverez ci-dessous la liste des clés de la collection split_rows_collection.

dict_keys(['high', 'low'])

Exemple de code

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

relationalize

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Convertit une image DynamicFrame sous une forme adaptée à une base de données relationnelle. La relationnalisation de a DynamicFrame est particulièrement utile lorsque vous souhaitez déplacer des données d'un SQL environnement sans environnement tel que DynamoDB vers une base de données relationnelle telle que My. SQL

La transformation génère une liste des images en désimbriquant les colonnes imbriquées et en faisant pivoter les colonnes de tableau. Vous pouvez joindre les colonnes du tableau ayant été pivotées à l'aide de la clé générée au cours de la phase de désimbrication.

  • root_table_name – Nom de la table racine.

  • staging_path— Le chemin dans lequel la méthode peut stocker les partitions de tables pivotantes au CSV format (facultatif). Les tables dynamiques sont lues à partir de ce chemin.

  • options – Dictionnaire des paramètres facultatifs.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple : utiliser la méthode relationalize pour aplatir un schéma imbriqué dans une image DynamicFrame

Cet exemple de code utilise la méthode relationalize pour aplatir un schéma imbriqué sous une forme adaptée à une base de données relationnelle.

Exemple de jeu de données

L'exemple utilise une image DynamicFrame appelée legislators_combined avec le schéma suivant. legislators_combined possède plusieurs champs imbriqués tels que links, images et contact_details, qui seront aplatis par la transformation relationalize.

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

Exemple de code

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

La sortie suivante vous permet de comparer le schéma du champ imbriqué appelé contact_details à la table créée par la transformation relationalize. Notez que les enregistrements de la table renvoient vers la table principale à l'aide d'une clé étrangère appelée id et d'une colonne index représentant les positions du tableau.

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Renomme un champ dans ce DynamicFrame et renvoie un nouveau DynamicFrame avec le champ renommé.

  • oldName – Chemin d'accès complet au nœud que vous souhaitez renommer.

    Si l'ancien nom contient des points, RenameField ne fonctionne pas à moins que vous ne le délimitiez avec des apostrophes inverses (`). Par exemple, pour remplacer this.old.name par thisNewName, vous devez appeler rename_field comme suit.

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName – Nouveau nom, sous forme de chemin d'accès complet.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple : utiliser la méthode rename_field pour renommer des champs dans une image DynamicFrame

Cet exemple de code utilise la méthode rename_field pour renommer les champs dans une image DynamicFrame. Notez que l'exemple utilise le chaînage de méthode pour renommer plusieurs champs en même temps.

Note

Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.

Exemple de code

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

Résout un type de choix au sein de ce DynamicFrame et renvoie le nouveau DynamicFrame.

  • specs – Liste d'ambiguïtés spécifiques à résoudre, apparaissant sous forme de tuple: (field_path, action).

    Il existe deux façons d'utiliser resolveChoice. La première consiste à utiliser l'argument specs pour spécifier une séquence de champs spécifiques et la façon de les résoudre. L'autre mode pour resolveChoice consiste à utiliser l'argument choice afin de spécifier une seule résolution pour tous les ChoiceTypes.

    Les valeurs pour specs sont spécifiées en tant que tuples composés de paires (field_path, action). La valeur field_path identifie un élément ambigu spécifique, et la valeur action identifie la résolution correspondante. Les actions possibles sont les suivantes :

    • cast:type – tente de convertir toutes les valeurs vers le type spécifié. olpPar exemple : cast:int.

    • make_cols – Convertit chaque type distinct en une colonne avec le nom columnName_type. Résout une ambiguïté potentielle en aplatissant les données. Par exemple, si columnA peut être un int ou un string, la résolution consisterait à produire deux colonnes nommées columnA_int et columnA_string dans le DynamicFrame obtenu.

    • make_struct – résout une ambigüité potentielle en utilisant un struct pour représenter les données. Par exemple, si des données d'une colonne peuvent être de type int ou string, l'action make_struct produit une colonne de structures dans l'image DynamicFrame obtenue. Chaque structure contient à la fois des données de type int et des données de type string.

    • project:type – résout une ambiguïté potentielle en projetant toutes les données sur l'un des types de données possibles. Par exemple, si des données d'une colonne peuvent être un int ou un string, une action project:string produit une colonne dans le DynamicFrame obtenu, où toutes les valeurs int ont été converties en chaînes.

    Si le field_path identifie un tableau, placez des crochets vides après le nom du tableau pour éviter toute ambiguïté. Par exemple, supposons que vous travailliez avec les données structurées comme suit :

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    Vous pouvez sélectionner la version numérique plutôt que la version chaîne du prix en définissant field_path sur "myList[].price" et action sur "cast:double".

    Note

    Vous ne pouvez utiliser qu'un seul des paramètres specs et choice. Si le paramètre specs n'est pas None, alors le paramètre choice doit être une chaîne vide. Inversement, si le paramètre choice n'est pas une chaîne vide, alors le paramètre specs doit être None.

  • choice – spécifie une résolution unique pour tous les ChoiceTypes. Vous pouvez utiliser cela lorsque la liste complète des ChoiceTypes est inconnue avant l'exécution. En plus des actions répertoriées précédemment pour specs, cet argument prend également en charge l'action suivante :

    • match_catalog – tente de convertir chaque ChoiceType dans le type correspondant de table Data Catalog spécifiée.

  • database – base de données Data Catalog à utiliser avec l'action match_catalog.

  • table_name – table Data Catalog à utiliser avec l'action match_catalog.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas être arrêté dans ce cas.

  • catalog_id – ID du catalogue Data Catalog auquel vous accédez (ID du compte Data Catalog). Lorsque cette option est définie sur None (valeur par défaut), l'ID de catalogue du compte appelant est utilisé.

Exemple : resolveChoice à utiliser pour gérer une colonne contenant plusieurs types

Cet exemple de code utilise la méthode resolveChoice pour spécifier comment gérer une colonne DynamicFrame contenant des valeurs de plusieurs types. L'exemple montre deux méthodes courantes pour gérer une colonne de types différents :

  • Convertir la colonne en un seul type de données.

  • Conserver tous les types dans des colonnes distinctes.

Exemple de jeu de données

Note

Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : préparation des données à l'aide ResolveChoice de Lambda et ApplyMapping et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.

L'exemple utilise une image DynamicFrame appelée medicare avec le schéma suivant :

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Exemple de code

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Renvoie un nouveau DynamicFrame contenant les champs sélectionnés.

  • paths - Une liste de chaînes. Chaque chaîne est un chemin d'accès au nœud de niveau supérieur que vous souhaitez sélectionner.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple : utilisez select_fields pour créer un nouveau DynamicFrame avec les champs sélectionnés

L'exemple de code suivant illustre comment utiliser select_fields Méthode de création d'un nouveau DynamicFrame avec une liste de champs sélectionnés parmi un DynamicFrame.

Note

Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

simplify_ddb_json

simplify_ddb_json(): DynamicFrame

Simplifie les colonnes imbriquées DynamicFrame qui se trouvent spécifiquement dans la structure JSON DynamoDB et renvoie une nouvelle colonne simplifiée. DynamicFrame S'il existe plusieurs types ou types de carte dans un type de liste, les éléments de la liste ne seront pas simplifiés. Notez qu'il s'agit d'un type de transformation spécifique qui se comporte différemment de la unnest transformation normale et qui nécessite que les données figurent déjà dans la structure DynamoDBJSON. Pour plus d'informations, consultez DynamoDB JSON.

Par exemple, le schéma d'une lecture d'une exportation avec la structure JSON DynamoDB peut ressembler à ce qui suit :

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

La transformation simplify_ddb_json() convertirait ceci en :

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Exemple : utilisez simplify_ddb_json pour appeler un DynamoDB simplify JSON

Cet exemple de code utilise la simplify_ddb_json méthode permettant d'utiliser le connecteur d'exportation AWS Glue DynamoDB, d'invoquer un JSON DynamoDB Simplify et d'imprimer le nombre de partitions.

Exemple de code

from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())

spigot

spigot(path, options={})

Écrit des exemples d'enregistrement sur une destination spécifiée pour vous aider à vérifier les transformations effectuées par votre tâche.

  • path – Chemin d'accès de la destination sur laquelle écrire (obligatoire).

  • options – Paires clé-valeur spécifiant des options (facultatif). L'option "topk" indique que les premiers enregistrements k doivent être écrits. L'option "prob" indique la probabilité (sous forme de décimale) de choisir un enregistrement donné. Vous pouvez l'utiliser pour sélectionner les enregistrements à écrire.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

Exemple : utiliser la méthode spigot pour écrire des exemples de champs d'une image DynamicFrame vers Amazon S3

Cet exemple de code utilise la méthode spigot pour écrire des exemples d'enregistrements dans un compartiment Amazon S3 après avoir appliqué la transformation select_fields.

Exemple de jeu de données

Note

Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.

L'exemple utilise une image DynamicFrame appelée persons avec le schéma suivant :

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

Exemple de code

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

Voici un exemple des données que spigot écrit dans Amazon S3. Étant donné que l'exemple de code spécifie options={"topk": 10}, les exemples de données contiennent les 10 premiers enregistrements.

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Renvoie une nouvelle collection DynamicFrameCollection qui contient deux images DynamicFrames. La première image DynamicFrame contient tous les nœuds qui ont été fractionnés, et la seconde contient les nœuds qui restent.

  • paths – Liste de chaînes, chacune étant un chemin d'accès complet à un nœud que vous souhaitez fractionner en un nouveau DynamicFrame.

  • name1 – Chaîne de nom pour le DynamicFrame fractionné.

  • name2 – Chaîne de nom pour le DynamicFrame qui reste après le fractionnement des nœuds spécifiés.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple : utiliser la méthode split_fields pour fractionner les champs sélectionnés en une image DynamicFrame distincte

Cet exemple de code utilise la méthode split_fields pour fractionner une liste de champs spécifiés en une image DynamicFrame distincte.

Exemple de jeu de données

L'exemple utilise une image DynamicFrame appelée l_root_contact_details qui provient d'une collection nommée legislators_relationalized.

l_root_contact_details contient le schéma et les entrées suivantes.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

Exemple de code

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Scinde une ou plusieurs lignes d'un objet DynamicFrame en un nouveau DynamicFrame.

La méthode renvoie une nouvelle collection DynamicFrameCollection qui contient deux images DynamicFrames. La première image DynamicFrame contient toutes les lignes qui ont été fractionnées, et la seconde contient les lignes qui restent.

  • comparison_dict – Dictionnaire dans lequel la clé est un chemin d'accès à une colonne et la valeur est un autre dictionnaire permettant de mapper les comparateurs aux valeurs auxquelles les valeurs de la colonne sont comparées. Par exemple, {"age": {">": 10, "<": 20}} fractionne toutes les lignes dont la valeur de la colonne d'âge est supérieure à 10 et inférieure à 20.

  • name1 – Chaîne de nom pour le DynamicFrame fractionné.

  • name2 – Chaîne de nom pour le DynamicFrame qui reste après le fractionnement des nœuds spécifiés.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple : utiliser la méthode split_rows pour fractionner des lignes en une image DynamicFrame

Cet exemple de code utilise la méthode split_rows pour fractionner des lignes en une image DynamicFrame en se basant sur la valeur du champ id.

Exemple de jeu de données

L'exemple utilise une image DynamicFrame appelée l_root_contact_details qui est sélectionnée à partir d'une collection nommée legislators_relationalized.

l_root_contact_details contient le schéma et les entrées suivantes.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

Exemple de code

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

Effectue une opération unbox sur un champ de chaîne (ou le reformate) dans une image DynamicFrame et renvoie une nouvelle image DynamicFrame contenant les enregistrements DynamicRecords sur lesquels l'opération unbox a été effectuée.

Un DynamicRecord représente un enregistrement logique dans un DynamicFrame. Il est comparable à une ligne dans un DataFrame Apache Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe.

  • path – Chemin d'accès complet au nœud de chaîne pour lequel vous souhaitez effectuer une opération unbox.

  • format – spécification de format (facultatif). Vous l'utilisez pour un Amazon S3 ou AWS Glue connexion qui prend en charge plusieurs formats. Consultez Options de format pour les entrées et sorties dans AWS Glue pour Spark pour connaître les formats pris en charge.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • options – Un ou plusieurs des éléments suivants :

    • separator – Chaîne contenant le caractère de séparation.

    • escaper – Chaîne contenant le caractère d'échappement.

    • skipFirst – Valeur booléenne indiquant s'il faut ignorer la première instance.

    • withSchema— Chaîne contenant une JSON représentation du schéma du nœud. Le format de la JSON représentation d'un schéma est défini par la sortie deStructType.json().

    • withHeader – Valeur booléenne indiquant si un en-tête est inclus.

Exemple : utiliser la méthode unbox pour effectuer une opération unbox sur un champ de chaîne en un champ struct

Cet exemple de code utilise la méthode unbox pour effectuer une opération unbox sur un champ de chaîne dans une image DynamicFrame (ou la reformater) en un champ de type struct.

Exemple de jeu de données

L'exemple utilise une image DynamicFrame appelée mapped_with_string avec le schéma et les entrées suivantes.

Notez le champ nommé AddressString. Il s'agit du champ sur lequel l'exemple effectue une opération unbox en un champ struct.

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

Exemple de code

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

union

union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)

Union 2 DynamicFrames. Renvoie DynamicFrame tous les enregistrements des deux entrées DynamicFrames. Cette transformation peut renvoyer des résultats différents à partir de l'union de deux DataFrames avec des données équivalentes. Si vous avez besoin du comportement DataFrame syndical de Spark, pensez à utilisertoDF.

  • frame1— D'abord DynamicFrame à se syndiquer.

  • frame2— Après DynamicFrame l'union.

  • transformation_ctx : (facultatif) chaîne unique utilisée pour identifier les informations sur l'état/les statistiques.

  • info : (facultatif) chaîne à associer à des erreurs dans la transformation.

  • stageThreshold : (facultatif) nombre maximum d'erreurs dans la transformation jusqu'à ce que le traitement entraîne une erreur

  • totalThreshold : (facultatif) nombre maximum d'erreurs au total jusqu'à ce que le traitement entraîne une erreur.

unnest

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Désimbrique les objets imbriqués dans une image DynamicFrame. Ils deviennent des objets de niveau supérieur et l'opération renvoie une nouvelle image DynamicFrame non imbriquée.

  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.

Exemple : utiliser la méthode unnest pour transformer des champs imbriqués en champs de niveau supérieur

Cet exemple de code utilise la méthode unnest pour aplatir tous les champs imbriqués dans une image DynamicFrame en champs de niveau supérieur.

Exemple de jeu de données

L'exemple utilise une image DynamicFrame appelée mapped_medicare avec le schéma suivant. Notez que le champ Address est le seul qui contient des données imbriquées.

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

Exemple de code

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

Désimbrique les colonnes imbriquées DynamicFrame qui se trouvent spécifiquement dans la JSON structure DynamoDB et renvoie une nouvelle colonne non imbriquée. DynamicFrame Les colonnes d'un tableau de types de structure ne seront pas non-imbriquées. Notez qu'il s'agit d'un type spécifique de transformation non imbriquée qui se comporte différemment de la unnest transformation normale et qui nécessite que les données figurent déjà dans la structure DynamoDB. JSON Pour plus d'informations, consultez DynamoDB JSON.

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
  • transformation_ctx – Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).

  • info – Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif).

  • stageThreshold – Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l'arrêt du processus (facultatif : zéro par défaut, qui indique que le processus ne doit pas être arrêté dans ce cas).

  • totalThreshold – Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif : zéro par défaut, qui indique que le processus ne doit pas être arrêté dans ce cas).

Par exemple, le schéma d'une lecture d'une exportation avec la structure JSON DynamoDB peut ressembler à ce qui suit :

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

La transformation unnest_ddb_json() convertirait ceci en :

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

L'exemple de code suivant montre comment utiliser le connecteur d'exportation AWS Glue DynamoDB, appeler un JSON DynamoDB unnest et imprimer le nombre de partitions :

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

write

write(connection_type, connection_options, format, format_options, accumulator_size)

Permet d'obtenir un DataSink(object) du type de connexion spécifié à partir du GlueContext classe de ce type DynamicFrame, et l'utilise pour formater et écrire le contenu de ce DynamicFrame. Renvoie le nouveau DynamicFrame formaté et écrit comme spécifié.

  • connection_type – type de connexion à utiliser. Les valeurs valides sont s3, mysql, postgresql, redshift, sqlserver et oracle.

  • connection_options – option de connexion à utiliser (facultatif). Pour un connection_type de s3, un chemin Amazon S3 est défini.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    Pour JDBC les connexions, plusieurs propriétés doivent être définies. Notez que le nom de la base de données doit faire partie duURL. Il peut éventuellement être inclus dans les options de connexion.

    Avertissement

    Il n'est pas recommandé de stocker des mots de passe dans votre script. Envisagez de les utiliser boto3 pour les récupérer depuis AWS Secrets Manager le catalogue de données AWS Glue.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format – spécification de format (facultatif). Il est utilisé pour un Amazon Simple Storage Service (Amazon S3) ou un AWS Glue connexion qui prend en charge plusieurs formats. Consultez Options de format pour les entrées et sorties dans AWS Glue pour Spark pour connaître les formats pris en charge.

  • format_options – options de format pour le format spécifié. Consultez Options de format pour les entrées et sorties dans AWS Glue pour Spark pour connaître les formats pris en charge.

  • accumulator_size : la taille accumulable à utiliser, en octets (facultatif).

 – erreurs –

assertErrorThreshold

assertErrorThreshold( ) – Assertion pour les erreurs dans les transformations ayant créé ce DynamicFrame. Renvoie un Exception du DataFrame sous-jacent.

errorsAsDynamicChâssis

errorsAsDynamicFrame( ) – Renvoie un DynamicFrame dans lequel des enregistrements d'erreur sont imbriqués.

Exemple : utilisez errorsAsDynamic Frame pour afficher les enregistrements d'erreurs

L'exemple de code suivant illustre comment utiliser errorsAsDynamicFrame méthode pour afficher un enregistrement d'erreur pour un DynamicFrame.

Exemple de jeu de données

L'exemple utilise le jeu de données suivant que vous pouvez télécharger sur Amazon S3 sous le nom de fichierJSON. Notez que le deuxième enregistrement est mal formé. Les données mal formées interrompent généralement l'analyse des fichiers lorsque vous utilisez Spark. SQL Cependant, DynamicFrame reconnaît les problèmes de malformation et transforme les lignes mal formées en enregistrements d'erreurs que vous pouvez gérer individuellement.

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

Exemple de code

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ) – Renvoie le nombre total d'erreurs dans un DynamicFrame.

stageErrorsCount

stageErrorsCount – Renvoie le nombre d'erreurs survenues lors du processus de génération de ce DynamicFrame.