Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
DynamicFrame classe
L'une des principales abstractions d'Apache Spark est le Spark SQLDataFrame
, qui est similaire à la DataFrame
construction trouvée dans R et Pandas. A DataFrame
est similaire à une table et prend en charge les opérations de style fonctionnel (carte/réduire/filtre/etc.) et SQL les opérations (sélection, projet, agrégation).
DataFrames
sont puissants et largement utilisés, mais ils présentent des limites en ce qui concerne les opérations d'extraction, de transformation et de chargement (ETL). Cela signifie notamment qu'un schéma doit être spécifié avant tout chargement de données. Spark SQL résout ce problème en effectuant deux passages sur les données : le premier pour déduire le schéma et le second pour charger les données. Cependant, cette inférence est limitée et ne prend pas en compte la désorganisation des données. Par exemple, un même champ peut avoir différents types dans différents enregistrements. L'opération est souvent abandonnée par Apache Spark, qui indique que le type est string
en tenant compte du texte du champ d'origine. Cette information peut être erronée et vous pouvez souhaiter exercer un meilleur contrôle sur la résolution des écarts entre les schémas. Et pour les ensembles de données volumineux, un passage supplémentaire sur les données source peut s'avérer hors de prix.
Pour remédier à ces limites, AWS Glue introduit leDynamicFrame
. Un DynamicFrame
est similaire à un DataFrame
, mais chaque enregistrement y est auto-descriptif : initialement, aucun schéma n'est donc requis. Au lieu de cela, AWS Glue calcule un schéma on-the-fly lorsque cela est nécessaire et code explicitement les incohérences du schéma à l'aide d'un type de choix (ou d'union). Vous pouvez résoudre ces incohérences afin de rendre vos ensembles de données compatibles avec les magasins de données qui nécessitent un schéma fixe.
De même, un DynamicRecord
représente un enregistrement logique au sein dans un DynamicFrame
. Il est comparable à une ligne dans un DataFrame
Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe. Lorsque vous utilisez AWS Glue with PySpark, vous ne manipulez généralement pas de manière indépendanteDynamicRecords
. Vous allez plutôt transformer le jeu de données par le biais de son DynamicFrame
.
Vous pouvez convertir DynamicFrames
vers et depuis DataFrames
après la résolution des incohérences de schémas.
– construction –
__init__
__init__(jdf, glue_ctx, name)
-
jdf
— Une référence au bloc de données dans la machine virtuelle Java (JVM). -
glue_ctx
– Un objet GlueContext classe. -
name
– Nom de chaîne facultatif, vide par défaut.
fromDF
fromDF(dataframe, glue_ctx, name)
Convertit un DataFrame
en DynamicFrame
via la conversion de champs DataFrame
en champs DynamicRecord
. Renvoie un nouveau DynamicFrame
.
Un DynamicRecord
représente un enregistrement logique dans un DynamicFrame
. Il est comparable à une ligne dans un DataFrame
Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe.
Cette fonction s'attend à ce que les colonnes dont les noms sont dupliqués dans votre DataFrame
aient déjà été résolues.
-
dataframe
— L'Apache Spark SQLDataFrame
à convertir (obligatoire). -
glue_ctx
– Objet GlueContext classe qui spécifie le contexte pour cette transformation (obligatoire). -
name
— Le nom du résultatDynamicFrame
(facultatif depuis AWS Glue 3.0).
toDF
toDF(options)
Convertit un DynamicFrame
en DataFrame
Apache Spark via la conversion de DynamicRecords
en champs DataFrame
. Renvoie un nouveau DataFrame
.
Un DynamicRecord
représente un enregistrement logique dans un DynamicFrame
. Il est comparable à une ligne dans un DataFrame
Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe.
-
options
– liste d'options. Vous permet de définir des options supplémentaires pour le processus de conversion. Voici quelques options valides que vous pouvez utiliser avec le paramètre « options » :-
format
— spécifie le format des données, tel que json, csv, parquet). -
separater or sep
— pour les CSV fichiers, indique le délimiteur. -
header
— pour CSV les fichiers, indique si la première ligne est un en-tête (vrai/faux). -
inferSchema
— demande à Spark de déduire automatiquement le schéma (vrai/faux).
Voici un exemple d'utilisation du paramètre `options` avec la méthode `TODF` :
from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) csv_dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://my-bucket/path/to/csv/"]}, format="csv" ) csv_cf = csv_dyf.toDF(options={ "separator": ",", "header": "true", "ïnferSchema": "true" })
Spécifiez le type de cible si vous choisissez le type d'action
Project
etCast
. Voici quelques exemples :>>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])
-
– informations –
count
count( )
– Renvoie le nombre de lignes dans le DataFrame
sous-jacent.
schéma
schema( )
– Renvoie le schéma de ce DynamicFrame
ou, s'il n'est pas disponible, le schéma du DataFrame
sous-jacent.
Pour plus d'informations sur les types de DynamicFrame
qui composent ce schéma, consultez Types d'extension PySpark.
printSchema
printSchema( )
– Imprime le schéma du DataFrame
sous-jacent.
show
show(num_rows)
– Imprime un nombre de lignes spécifié du DataFrame
sous-jacent.
repartition
repartition(numPartitions)
– renvoie un nouvel objet DynamicFrame
avec des partitions numPartitions
.
coalesce
coalesce(numPartitions)
– renvoie un nouvel objet DynamicFrame
avec des partitions numPartitions
.
– transformations –
apply_mapping
apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Applique un mappage déclaratif à une image DynamicFrame
et renvoie une nouvelle image DynamicFrame
sur laquelle ces mappages sont appliqués. Les champs non spécifiés sont omis dans le nouveauDynamicFrame
.
-
mappings
— Une liste de tuples de mappage (obligatoire). Chacun étant composé comme suit : (colonne source, type source, colonne cible, type cible).Si la colonne source comporte un point «
.
» dans le nom, vous devez l'entourer d'accents graves «``
». Par exemple, pour mapperthis.old.name
(chaîne) àthisNewName
, vous devez utiliser le tuple suivant :("`this.old.name`", "string", "thisNewName", "string")
-
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : Utilisez apply_mapping afin de renommer des champs ainsi que de modifier ses types
L'exemple de code suivant montre la apply_mapping
méthode d’utilisation relative au renommage des champs sélectionnés ainsi qu’ à la modification des types de champs.
Note
Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.
# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date
drop_fields
drop_fields(paths, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Appelle la transformation Classe FlatMap afin de supprimer des champs d'un DynamicFrame
. Renvoie un nouveau DynamicFrame
avec les champs supprimés spécifiés.
-
paths
- Une liste de chaînes. Chacune contient le chemin d'accès complet à un nœud de champ que vous souhaitez supprimer. Vous pouvez utiliser la notation par points pour spécifier des champs imbriqués. Par exemple, si le champfirst
est un enfant du champname
dans l'arborescence, vous spécifiez"name.first"
pour le chemin d'accès.Si le nom d'un nœud de champ contient un
.
littéral, vous devez l'entourer d'accents graves (`
). -
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : utilisez supprimer les chams pour supprimer les champs d'unDynamicFrame
Cet exemple de code utilise ledrop_fields
méthode pour supprimer les champs de niveau supérieur et imbriqués sélectionnés d'unDynamicFrame
.
Exemple de jeu de données
L'exemple utilise l'ensemble de données suivant qui est représenté parEXAMPLE-FRIENDS-DATA
table dans le code :
{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}
Exemple de code
# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "
MY-EXAMPLE-DATABASE
" glue_source_table = "EXAMPLE-FRIENDS-DATA
" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string
filtre
filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Renvoie un nouveau DynamicFrame
contien tout DynamicRecords
de l'entrée DynamicFrame
qui correspondent à la fonction de prédicat f
.
-
f
– Fonction de prédicat à appliquer au paramètreDynamicFrame
. La fonction doit prendre unDynamicRecord
en tant qu'argument et renvoyer la valeur True si leDynamicRecord
répond aux critères du filtre, ou la valeur False si ce n'est pas le cas (obligatoire).Un
DynamicRecord
représente un enregistrement logique dans unDynamicFrame
. Il est comparable à une ligne dans un SparkDataFrame
, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe. -
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : Utiliser un filtre pour obtenir une sélection de champs filtrée
Cet exemple utilise lefilter
Méthode de création d'un nouveauDynamicFrame
qui inclut une sélection filtrée d'un autre DynamicFrame
des champs.
Comme le map
Méthode, filter
prend une fonction comme argument qui est appliquée à chaque enregistrement de l'originalDynamicFrame
. La fonction prend un enregistrement en entrée et renvoie une valeur booléenne. Si la valeur de retour est vraie, l'enregistrement est inclus dans le résultatDynamicFrame
. Si c'est faux, le dossier est omis.
Note
Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : préparation des données à l'aide ResolveChoice de Lambda et ApplyMapping et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.
# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564
join
join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Effectue une jointure d'égalité avec un autre DynamicFrame
et renvoie le DynamicFrame
obtenu.
-
paths1
– Liste des clés dans cette trame à joindre. -
paths2
– Liste des clés dans l'autre trame à joindre. -
frame2
– AutreDynamicFrame
à joindre. -
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : Utiliser la jointure pour combinerDynamicFrames
Cet exemple utilise la join
méthode pour effectuer une jointure sur troisDynamicFrames
. AWS Glue effectue la jointure en fonction des clés de champ que vous fournissez. Le résultat DynamicFrame
contient les lignes des deux cadres d'origine où les clés spécifiées correspondent.
Notez que lejoin
transform conserve tous les champs intacts. Cela signifie que les champs que vous spécifiez comme correspondants apparaissent dans le résultat DynamicFrame, même s'ils sont redondants et contiennent les mêmes clés. Dans cet exemple, nous utilisonsdrop_fields
pour supprimer ces clés redondantes après la jointure.
Note
Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.
# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
map
map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Renvoie un nouveau DynamicFrame
qui résulte de l'application de la fonction de mappage spécifiée à tous les enregistrements dans le DynamicFrame
d'origine.
-
f
– Fonction de mappage à appliquer à tous les enregistrements dans le paramètreDynamicFrame
. La fonction doit prendre unDynamicRecord
comme argument et renvoyer un nouveauDynamicRecord
(obligatoire).Un
DynamicRecord
représente un enregistrement logique dans unDynamicFrame
. Il est comparable à une ligne dans unDataFrame
Apache Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe. transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).info
– Chaîne associée à des erreurs dans la transformation (facultatif).stageThreshold
– nombre maximal d'erreurs qui peuvent avoir lieu dans la transformation avant qu'elle ne soit arrêtée.(facultatif) La valeur par défaut est zéro.totalThreshold
– nombre maximal d'erreurs pouvant se produire globalement avant que le processus de traitement des erreurs ne soit arrêté.(facultatif) La valeur par défaut est zéro.
Exemple : utilisez la carte pour appliquer une fonction à chaque enregistrement dans un DynamicFrame
Cet exemple illustre comment utiliser map
méthode pour appliquer une fonction à chaque enregistrement d'un DynamicFrame
. Plus précisément, cet exemple applique une fonction appelée MergeAddress
à chaque enregistrement afin de fusionner plusieurs champs d'adresse en un seul struct
type.
Note
Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : préparation des données à l'aide ResolveChoice de Lambda et ApplyMapping et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.
# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
mergeDynamicFrame
mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "",
options = {}, info = "", stageThreshold = 0, totalThreshold = 0)
Fusionne cette trame DynamicFrame
avec une trame DynamicFrame
intermédiaire basée sur les clés primaires spécifiées pour identifier les enregistrements. Les registres en double (registres avec les mêmes clés primaires) ne sont pas dédupliqués. Si aucun enregistrement ne correspond dans la trame intermédiaire, tous les enregistrements (y compris les doublons) sont conservés dans la source. Si la trame intermédiaire contient des enregistrements correspondants, les enregistrements de la trame intermédiaire remplacent les enregistrements de la source dans AWS Glue.
-
stage_dynamic_frame
– trameDynamicFrame
intermédiaire à fusionner. -
primary_keys
– liste des champs de clé primaire permettant de faire correspondre les enregistrements des trames dynamiques source et intermédiaire. -
transformation_ctx
– chaîne unique utilisée pour récupérer les métadonnées relatives à la transformation en cours (facultatif). -
options
— Chaîne de paires JSON nom-valeur fournissant des informations supplémentaires pour cette transformation. Cet argument n'est actuellement pas utilisé. -
info
– AString
. Toute chaîne à associer à des erreurs dans cette transformation. -
stageThreshold
– ALong
. Nombre d'erreurs identifiées dans la transformation donnée et à corriger lors du traitement. -
totalThreshold
– ALong
. Nombre total d'erreurs identifiées dans la transformation donnée et qui doivent être corrigées lors du traitement.
La méthode renvoie une nouvelle image DynamicFrame
obtenue en fusionnant DynamicFrame
avec l'image DynamicFrame
intermédiaire.
La trame DynamicFrame
renvoyée contient l'enregistrement A dans les cas suivants :
-
Si
A
se trouve à la fois dans la trame source et la trame intermédiaire, c'est la valeurA
de la trame intermédiaire qui est renvoyée. -
Si
A
se trouve dans la table source et siA.primaryKeys
ne se trouve pas dans l'imagestagingDynamicFrame
,A
n'est pas mis à jour dans la table intermédiaire.
L'image source et l'image intermédiaire n'ont pas besoin d'avoir le même schéma.
Exemple : mergeDynamicFrame à utiliser pour fusionner deux DynamicFrames
en fonction d'une clé primaire
L'exemple de code suivant montre comment utiliser la méthode mergeDynamicFrame
pour fusionner une image DynamicFrame
avec une image DynamicFrame
« intermédiaire », sur la base de la clé primaire id
.
Exemple de jeu de données
L'exemple utilise deux images DynamicFrames
d'une collection DynamicFrameCollection
appelée split_rows_collection
. Vous trouverez ci-dessous la liste des clés de la collection split_rows_collection
.
dict_keys(['high', 'low'])
Exemple de code
# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+
relationalize
relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Convertit une image DynamicFrame
sous une forme adaptée à une base de données relationnelle. La relationnalisation de a DynamicFrame
est particulièrement utile lorsque vous souhaitez déplacer des données d'un SQL environnement sans environnement tel que DynamoDB vers une base de données relationnelle telle que My. SQL
La transformation génère une liste des images en désimbriquant les colonnes imbriquées et en faisant pivoter les colonnes de tableau. Vous pouvez joindre les colonnes du tableau ayant été pivotées à l'aide de la clé générée au cours de la phase de désimbrication.
root_table_name
– Nom de la table racine.staging_path
— Le chemin dans lequel la méthode peut stocker les partitions de tables pivotantes au CSV format (facultatif). Les tables dynamiques sont lues à partir de ce chemin.options
– Dictionnaire des paramètres facultatifs.-
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : utiliser la méthode relationalize pour aplatir un schéma imbriqué dans une image DynamicFrame
Cet exemple de code utilise la méthode relationalize
pour aplatir un schéma imbriqué sous une forme adaptée à une base de données relationnelle.
Exemple de jeu de données
L'exemple utilise une image DynamicFrame
appelée legislators_combined
avec le schéma suivant. legislators_combined
possède plusieurs champs imbriqués tels que links
, images
et contact_details
, qui seront aplatis par la transformation relationalize
.
root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
Exemple de code
# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "
s3://DOC-EXAMPLE-BUCKET/tmpDir
" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()
La sortie suivante vous permet de comparer le schéma du champ imbriqué appelé contact_details
à la table créée par la transformation relationalize
. Notez que les enregistrements de la table renvoient vers la table principale à l'aide d'une clé étrangère appelée id
et d'une colonne index
représentant les positions du tableau.
dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+
rename_field
rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Renomme un champ dans ce DynamicFrame
et renvoie un nouveau DynamicFrame
avec le champ renommé.
-
oldName
– Chemin d'accès complet au nœud que vous souhaitez renommer.Si l'ancien nom contient des points,
RenameField
ne fonctionne pas à moins que vous ne le délimitiez avec des apostrophes inverses (`
). Par exemple, pour remplacerthis.old.name
parthisNewName
, vous devez appeler rename_field comme suit.newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
-
newName
– Nouveau nom, sous forme de chemin d'accès complet. -
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : utiliser la méthode rename_field pour renommer des champs dans une image DynamicFrame
Cet exemple de code utilise la méthode rename_field
pour renommer les champs dans une image DynamicFrame
. Notez que l'exemple utilise le chaînage de méthode pour renommer plusieurs champs en même temps.
Note
Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.
Exemple de code
# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string
resolveChoice
resolveChoice(specs = None, choice = "" , database = None , table_name = None ,
transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id =
None)
Résout un type de choix au sein de ce DynamicFrame
et renvoie le nouveau DynamicFrame
.
-
specs
– Liste d'ambiguïtés spécifiques à résoudre, apparaissant sous forme de tuple:(field_path, action)
.Il existe deux façons d'utiliser
resolveChoice
. La première consiste à utiliser l'argumentspecs
pour spécifier une séquence de champs spécifiques et la façon de les résoudre. L'autre mode pourresolveChoice
consiste à utiliser l'argumentchoice
afin de spécifier une seule résolution pour tous lesChoiceTypes
.Les valeurs pour
specs
sont spécifiées en tant que tuples composés de paires(field_path, action)
. La valeurfield_path
identifie un élément ambigu spécifique, et la valeuraction
identifie la résolution correspondante. Les actions possibles sont les suivantes :-
cast:
– tente de convertir toutes les valeurs vers le type spécifié. olpPar exemple :type
cast:int
. -
make_cols
– Convertit chaque type distinct en une colonne avec le nom
. Résout une ambiguïté potentielle en aplatissant les données. Par exemple, sicolumnName
_type
columnA
peut être unint
ou unstring
, la résolution consisterait à produire deux colonnes nomméescolumnA_int
etcolumnA_string
dans leDynamicFrame
obtenu. -
make_struct
– résout une ambigüité potentielle en utilisant unstruct
pour représenter les données. Par exemple, si des données d'une colonne peuvent être de typeint
oustring
, l'actionmake_struct
produit une colonne de structures dans l'imageDynamicFrame
obtenue. Chaque structure contient à la fois des données de typeint
et des données de typestring
. -
project:
– résout une ambiguïté potentielle en projetant toutes les données sur l'un des types de données possibles. Par exemple, si des données d'une colonne peuvent être untype
int
ou unstring
, une actionproject:string
produit une colonne dans leDynamicFrame
obtenu, où toutes les valeursint
ont été converties en chaînes.
Si le
field_path
identifie un tableau, placez des crochets vides après le nom du tableau pour éviter toute ambiguïté. Par exemple, supposons que vous travailliez avec les données structurées comme suit :"myList": [ { "price": 100.00 }, { "price": "$100.00" } ]
Vous pouvez sélectionner la version numérique plutôt que la version chaîne du prix en définissant
field_path
sur"myList[].price"
etaction
sur"cast:double"
.Note
Vous ne pouvez utiliser qu'un seul des paramètres
specs
etchoice
. Si le paramètrespecs
n'est pasNone
, alors le paramètrechoice
doit être une chaîne vide. Inversement, si le paramètrechoice
n'est pas une chaîne vide, alors le paramètrespecs
doit êtreNone
. -
choice
– spécifie une résolution unique pour tous lesChoiceTypes
. Vous pouvez utiliser cela lorsque la liste complète desChoiceTypes
est inconnue avant l'exécution. En plus des actions répertoriées précédemment pourspecs
, cet argument prend également en charge l'action suivante :-
match_catalog
– tente de convertir chaqueChoiceType
dans le type correspondant de table Data Catalog spécifiée.
-
-
database
– base de données Data Catalog à utiliser avec l'actionmatch_catalog
. -
table_name
– table Data Catalog à utiliser avec l'actionmatch_catalog
. -
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas être arrêté dans ce cas. -
catalog_id
– ID du catalogue Data Catalog auquel vous accédez (ID du compte Data Catalog). Lorsque cette option est définie surNone
(valeur par défaut), l'ID de catalogue du compte appelant est utilisé.
Exemple : resolveChoice à utiliser pour gérer une colonne contenant plusieurs types
Cet exemple de code utilise la méthode resolveChoice
pour spécifier comment gérer une colonne DynamicFrame
contenant des valeurs de plusieurs types. L'exemple montre deux méthodes courantes pour gérer une colonne de types différents :
Convertir la colonne en un seul type de données.
Conserver tous les types dans des colonnes distinctes.
Exemple de jeu de données
Note
Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : préparation des données à l'aide ResolveChoice de Lambda et ApplyMapping et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.
L'exemple utilise une image DynamicFrame
appelée medicare
avec le schéma suivant :
root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string
Exemple de code
# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows
select_fields
select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Renvoie un nouveau DynamicFrame
contenant les champs sélectionnés.
-
paths
- Une liste de chaînes. Chaque chaîne est un chemin d'accès au nœud de niveau supérieur que vous souhaitez sélectionner. -
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : utilisez select_fields pour créer un nouveau DynamicFrame
avec les champs sélectionnés
L'exemple de code suivant illustre comment utiliser select_fields
Méthode de création d'un nouveau DynamicFrame
avec une liste de champs sélectionnés parmi un DynamicFrame
.
Note
Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.
# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows
simplify_ddb_json
simplify_ddb_json(): DynamicFrame
Simplifie les colonnes imbriquées DynamicFrame
qui se trouvent spécifiquement dans la structure JSON DynamoDB et renvoie une nouvelle colonne simplifiée. DynamicFrame
S'il existe plusieurs types ou types de carte dans un type de liste, les éléments de la liste ne seront pas simplifiés. Notez qu'il s'agit d'un type de transformation spécifique qui se comporte différemment de la unnest
transformation normale et qui nécessite que les données figurent déjà dans la structure DynamoDBJSON. Pour plus d'informations, consultez DynamoDB JSON.
Par exemple, le schéma d'une lecture d'une exportation avec la structure JSON DynamoDB peut ressembler à ce qui suit :
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
La transformation simplify_ddb_json()
convertirait ceci en :
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Exemple : utilisez simplify_ddb_json pour appeler un DynamoDB simplify JSON
Cet exemple de code utilise la simplify_ddb_json
méthode permettant d'utiliser le connecteur d'exportation AWS Glue DynamoDB, d'invoquer un JSON DynamoDB Simplify et d'imprimer le nombre de partitions.
Exemple de code
from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())
spigot
spigot(path, options={})
Écrit des exemples d'enregistrement sur une destination spécifiée pour vous aider à vérifier les transformations effectuées par votre tâche.
-
path
– Chemin d'accès de la destination sur laquelle écrire (obligatoire). -
options
– Paires clé-valeur spécifiant des options (facultatif). L'option"topk"
indique que les premiers enregistrementsk
doivent être écrits. L'option"prob"
indique la probabilité (sous forme de décimale) de choisir un enregistrement donné. Vous pouvez l'utiliser pour sélectionner les enregistrements à écrire. transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif).
Exemple : utiliser la méthode spigot pour écrire des exemples de champs d'une image DynamicFrame
vers Amazon S3
Cet exemple de code utilise la méthode spigot
pour écrire des exemples d'enregistrements dans un compartiment Amazon S3 après avoir appliqué la transformation select_fields
.
Exemple de jeu de données
Note
Pour accéder au jeu de données utilisé dans cet exemple, voir Exemple de code : Données de jonction et de mise en relation et suivez les instructions de la sectionÉtape 1 : analyser les données dans le compartiment Amazon S3.
L'exemple utilise une image DynamicFrame
appelée persons
avec le schéma suivant :
root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string
Exemple de code
# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="
s3://DOC-EXAMPLE-BUCKET
", options={"topk": 10} )
Voici un exemple des données que spigot
écrit dans Amazon S3. Étant donné que l'exemple de code spécifie options={"topk": 10}
, les exemples de données contiennent les 10 premiers enregistrements.
{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}
split_fields
split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Renvoie une nouvelle collection DynamicFrameCollection
qui contient deux images DynamicFrames
. La première image DynamicFrame
contient tous les nœuds qui ont été fractionnés, et la seconde contient les nœuds qui restent.
-
paths
– Liste de chaînes, chacune étant un chemin d'accès complet à un nœud que vous souhaitez fractionner en un nouveauDynamicFrame
. -
name1
– Chaîne de nom pour leDynamicFrame
fractionné. -
name2
– Chaîne de nom pour leDynamicFrame
qui reste après le fractionnement des nœuds spécifiés. -
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : utiliser la méthode split_fields pour fractionner les champs sélectionnés en une image DynamicFrame
distincte
Cet exemple de code utilise la méthode split_fields
pour fractionner une liste de champs spécifiés en une image DynamicFrame
distincte.
Exemple de jeu de données
L'exemple utilise une image DynamicFrame
appelée l_root_contact_details
qui provient d'une collection nommée legislators_relationalized
.
l_root_contact_details
contient le schéma et les entrées suivantes.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...
Exemple de code
# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string
split_rows
split_rows(comparison_dict, name1, name2, transformation_ctx="", info="",
stageThreshold=0, totalThreshold=0)
Scinde une ou plusieurs lignes d'un objet DynamicFrame
en un nouveau DynamicFrame
.
La méthode renvoie une nouvelle collection DynamicFrameCollection
qui contient deux images DynamicFrames
. La première image DynamicFrame
contient toutes les lignes qui ont été fractionnées, et la seconde contient les lignes qui restent.
-
comparison_dict
– Dictionnaire dans lequel la clé est un chemin d'accès à une colonne et la valeur est un autre dictionnaire permettant de mapper les comparateurs aux valeurs auxquelles les valeurs de la colonne sont comparées. Par exemple,{"age": {">": 10, "<": 20}}
fractionne toutes les lignes dont la valeur de la colonne d'âge est supérieure à 10 et inférieure à 20. -
name1
– Chaîne de nom pour leDynamicFrame
fractionné. -
name2
– Chaîne de nom pour leDynamicFrame
qui reste après le fractionnement des nœuds spécifiés. -
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : utiliser la méthode split_rows pour fractionner des lignes en une image DynamicFrame
Cet exemple de code utilise la méthode split_rows
pour fractionner des lignes en une image DynamicFrame
en se basant sur la valeur du champ id
.
Exemple de jeu de données
L'exemple utilise une image DynamicFrame
appelée l_root_contact_details
qui est sélectionnée à partir d'une collection nommée legislators_relationalized
.
l_root_contact_details
contient le schéma et les entrées suivantes.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+
Exemple de code
# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows
unbox
unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)
Effectue une opération unbox sur un champ de chaîne (ou le reformate) dans une image DynamicFrame
et renvoie une nouvelle image DynamicFrame
contenant les enregistrements DynamicRecords
sur lesquels l'opération unbox a été effectuée.
Un DynamicRecord
représente un enregistrement logique dans un DynamicFrame
. Il est comparable à une ligne dans un DataFrame
Apache Spark, à la différence qu'il est auto-descriptif et peut être utilisé pour les données qui ne sont pas conformes à un schéma fixe.
-
path
– Chemin d'accès complet au nœud de chaîne pour lequel vous souhaitez effectuer une opération unbox. format
– spécification de format (facultatif). Vous l'utilisez pour un Amazon S3 ou AWS Glue connexion qui prend en charge plusieurs formats. Consultez Options de format pour les entrées et sorties dans AWS Glue pour Spark pour connaître les formats pris en charge.-
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
options
– Un ou plusieurs des éléments suivants :separator
– Chaîne contenant le caractère de séparation.escaper
– Chaîne contenant le caractère d'échappement.skipFirst
– Valeur booléenne indiquant s'il faut ignorer la première instance.-
withSchema
— Chaîne contenant une JSON représentation du schéma du nœud. Le format de la JSON représentation d'un schéma est défini par la sortie deStructType.json()
. withHeader
– Valeur booléenne indiquant si un en-tête est inclus.
Exemple : utiliser la méthode unbox pour effectuer une opération unbox sur un champ de chaîne en un champ struct
Cet exemple de code utilise la méthode unbox
pour effectuer une opération unbox sur un champ de chaîne dans une image DynamicFrame
(ou la reformater) en un champ de type struct.
Exemple de jeu de données
L'exemple utilise une image DynamicFrame
appelée mapped_with_string
avec le schéma et les entrées suivantes.
Notez le champ nommé AddressString
. Il s'agit du champ sur lequel l'exemple effectue une opération unbox en un champ struct.
root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...
Exemple de code
# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows
union
union(frame1, frame2, transformation_ctx = "",
info = "", stageThreshold = 0, totalThreshold = 0)
Union 2 DynamicFrames. Renvoie DynamicFrame tous les enregistrements des deux entrées DynamicFrames. Cette transformation peut renvoyer des résultats différents à partir de l'union de deux DataFrames avec des données équivalentes. Si vous avez besoin du comportement DataFrame syndical de Spark, pensez à utilisertoDF
.
-
frame1
— D'abord DynamicFrame à se syndiquer. -
frame2
— Après DynamicFrame l'union. -
transformation_ctx
: (facultatif) chaîne unique utilisée pour identifier les informations sur l'état/les statistiques. -
info
: (facultatif) chaîne à associer à des erreurs dans la transformation. -
stageThreshold
: (facultatif) nombre maximum d'erreurs dans la transformation jusqu'à ce que le traitement entraîne une erreur -
totalThreshold
: (facultatif) nombre maximum d'erreurs au total jusqu'à ce que le traitement entraîne une erreur.
unnest
unnest(transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Désimbrique les objets imbriqués dans une image DynamicFrame
. Ils deviennent des objets de niveau supérieur et l'opération renvoie une nouvelle image DynamicFrame
non imbriquée.
-
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l’erreur générée par le processus (facultatif) La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur. -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif). La valeur par défaut est zéro, ce qui indique que le processus ne doit pas générer d'erreur.
Exemple : utiliser la méthode unnest pour transformer des champs imbriqués en champs de niveau supérieur
Cet exemple de code utilise la méthode unnest
pour aplatir tous les champs imbriqués dans une image DynamicFrame
en champs de niveau supérieur.
Exemple de jeu de données
L'exemple utilise une image DynamicFrame
appelée mapped_medicare
avec le schéma suivant. Notez que le champ Address
est le seul qui contient des données imbriquées.
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
Exemple de code
# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
unnest_ddb_json
Désimbrique les colonnes imbriquées DynamicFrame
qui se trouvent spécifiquement dans la JSON structure DynamoDB et renvoie une nouvelle colonne non imbriquée. DynamicFrame
Les colonnes d'un tableau de types de structure ne seront pas non-imbriquées. Notez qu'il s'agit d'un type spécifique de transformation non imbriquée qui se comporte différemment de la unnest
transformation normale et qui nécessite que les données figurent déjà dans la structure DynamoDB. JSON Pour plus d'informations, consultez DynamoDB JSON.
unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
-
transformation_ctx
– Chaîne unique utilisée pour identifier les informations sur l'état (facultatif). -
info
– Chaîne à associer avec le signalement des erreurs pour cette transformation (facultatif). -
stageThreshold
– Nombre d'erreurs rencontrées pendant cette transformation et qui doit entraîner l'arrêt du processus (facultatif : zéro par défaut, qui indique que le processus ne doit pas être arrêté dans ce cas). -
totalThreshold
– Nombre d'erreurs rencontrées jusqu'à cette transformation, incluse, et qui doit entraîner l'arrêt du processus (facultatif : zéro par défaut, qui indique que le processus ne doit pas être arrêté dans ce cas).
Par exemple, le schéma d'une lecture d'une exportation avec la structure JSON DynamoDB peut ressembler à ce qui suit :
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
La transformation unnest_ddb_json()
convertirait ceci en :
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
L'exemple de code suivant montre comment utiliser le connecteur d'exportation AWS Glue DynamoDB, appeler un JSON DynamoDB unnest et imprimer le nombre de partitions :
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()
write
write(connection_type, connection_options, format, format_options, accumulator_size)
Permet d'obtenir un DataSink(object) du type de connexion spécifié à partir du GlueContext classe de ce type DynamicFrame
, et l'utilise pour formater et écrire le contenu de ce DynamicFrame
. Renvoie le nouveau DynamicFrame
formaté et écrit comme spécifié.
-
connection_type
– type de connexion à utiliser. Les valeurs valides sonts3
,mysql
,postgresql
,redshift
,sqlserver
etoracle
. -
connection_options
– option de connexion à utiliser (facultatif). Pour unconnection_type
des3
, un chemin Amazon S3 est défini.connection_options = {"path": "
s3://aws-glue-target/temp
"}Pour JDBC les connexions, plusieurs propriétés doivent être définies. Notez que le nom de la base de données doit faire partie duURL. Il peut éventuellement être inclus dans les options de connexion.
Avertissement
Il n'est pas recommandé de stocker des mots de passe dans votre script. Envisagez de les utiliser
boto3
pour les récupérer depuis AWS Secrets Manager le catalogue de données AWS Glue.connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"} format
– spécification de format (facultatif). Il est utilisé pour un Amazon Simple Storage Service (Amazon S3) ou un AWS Glue connexion qui prend en charge plusieurs formats. Consultez Options de format pour les entrées et sorties dans AWS Glue pour Spark pour connaître les formats pris en charge.format_options
– options de format pour le format spécifié. Consultez Options de format pour les entrées et sorties dans AWS Glue pour Spark pour connaître les formats pris en charge.accumulator_size
: la taille accumulable à utiliser, en octets (facultatif).
– erreurs –
assertErrorThreshold
assertErrorThreshold( )
– Assertion pour les erreurs dans les transformations ayant créé ce DynamicFrame
. Renvoie un Exception
du DataFrame
sous-jacent.
errorsAsDynamicChâssis
errorsAsDynamicFrame( )
– Renvoie un DynamicFrame
dans lequel des enregistrements d'erreur sont imbriqués.
Exemple : utilisez errorsAsDynamic Frame pour afficher les enregistrements d'erreurs
L'exemple de code suivant illustre comment utiliser errorsAsDynamicFrame
méthode pour afficher un enregistrement d'erreur pour un DynamicFrame
.
Exemple de jeu de données
L'exemple utilise le jeu de données suivant que vous pouvez télécharger sur Amazon S3 sous le nom de fichierJSON. Notez que le deuxième enregistrement est mal formé. Les données mal formées interrompent généralement l'analyse des fichiers lorsque vous utilisez Spark. SQL Cependant, DynamicFrame
reconnaît les problèmes de malformation et transforme les lignes mal formées en enregistrements d'erreurs que vous pouvez gérer individuellement.
{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}
Exemple de code
# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["
s3://DOC-EXAMPLE-S3-BUCKET/error_data.json
"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')
errorsCount
errorsCount( )
– Renvoie le nombre total d'erreurs dans un DynamicFrame
.
stageErrorsCount
stageErrorsCount
– Renvoie le nombre d'erreurs survenues lors du processus de génération de ce DynamicFrame
.