DynamicFrame classe - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

DynamicFrame classe

Una delle principali astrazioni in Apache Spark è SparkSQL DataFrame, che è simile al costrutto DataFrame di R e Pandas. Un oggetto DataFrame è simile a una tabella e supporta operazioni di tipo funzionale (mappatura, riduzione, filtro e così via) e operazioni SQL (selezione, proiezione, aggregazione).

I DataFrames sono potenti e ampiamente utilizzati, ma presentano delle limitazioni riguardo operazioni di estrazione, trasformazione e caricamento (ETL). Principalmente, richiedono che venga specificato uno schema prima di caricare qualsiasi dato. SparkSQL risolve il problema eseguendo due passaggi sui dati: il primo per dedurre lo schema e il secondo per caricare i dati. Tuttavia, l'inferenza è limitata e non gestisce i casi di dati non organizzati. Lo stesso campo, ad esempio, potrebbe essere di tipo diverso in record diversi. Apache Spark spesso si ferma e definisce il tipo come string utilizzando il testo del campo originale. Questo potrebbe non essere corretto e potrebbe essere richiesto un controllo più preciso sulle modalità di risoluzione delle discrepanze dello schema. Inoltre, per i set di dati di grandi dimensioni, un ulteriore passaggio sui dati di origine potrebbe essere proibitivo in termini di costi.

Per ovviare a queste limitazioni, AWS Glue introduce ilDynamicFrame. Un DynamicFrame è simile a un DataFrame, con la differenza che ogni record è autodescrittivo, quindi inizialmente non è richiesto alcuno schema. Al contrario, AWS Glue calcola uno schema on-the-fly quando richiesto e codifica esplicitamente le incongruenze dello schema utilizzando un tipo di scelta (o unione). Puoi risolvere queste incongruenze per rendere i set di dati compatibili con i datastore che richiedono uno schema fisso.

Analogamente, un DynamicRecord rappresenta un record logico all'interno di un DynamicFrame. È come una riga in un DataFrame Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso. Quando si utilizza AWS Glue with PySpark, in genere non si manipola in modo indipendenteDynamicRecords. Viceversa, di solito si trasforma il set di dati nel suo complesso attraverso il rispettivo DynamicFrame.

Dopo aver risolto eventuali incongruenze dello schema, puoi convertire DynamicFrames in e da DataFrames.

 — construction —

__init__

__init__(jdf, glue_ctx, name)
  • jdf: un riferimento al frame di dati nella JVM (Java Virtual Machine).

  • glue_ctx: un oggetto GlueContext classe.

  • name: una stringa nome opzionale, vuota per impostazione predefinita.

fromDF

fromDF(dataframe, glue_ctx, name)

Converte un DataFrame in un DynamicFrame convertendo campi del DataFrame in campi del DynamicRecord. Restituisce il nuovo DynamicFrame.

Un DynamicRecord rappresenta un record logico all'interno di un DynamicFrame. È simile a una riga in un DataFrame Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso.

Questa funzione prevede che le colonne con nomi duplicati nel DataFrame siano già state risolte.

  • dataframe: il DataFrame Apache Spark SQL da convertire (obbligatorio).

  • glue_ctx: l'oggetto GlueContext classe che specifica il contesto di questa trasformazione (richiesto).

  • name— Il nome del risultato DynamicFrame (opzionale a partire da AWS Glue 3.0).

toDF

toDF(options)

Converte un DynamicFrame in un DataFrame Apache Spark, convertendo DynamicRecords in campi di DataFrame. Restituisce il nuovo DataFrame.

Un DynamicRecord rappresenta un record logico all'interno di un DynamicFrame. È simile a una riga in un DataFrame Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso.

  • options: un elenco di opzioni. Se scegli il tipo di operazione Project e Cast, devi specificare il tipo di destinazione. Gli esempi includono quanto segue.

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 — information —

count

count( ): restituisce il numero di righe nell'oggetto sottostante DataFrame.

schema

schema( ): restituisce lo schema di questo DynamicFrame oppure, se non è disponibile, lo schema del DataFrame sottostante.

Per ulteriori informazioni sui tipi di DynamicFrame che compongono questo schema, consulta la pagina Tipi di estensione PySpark.

printSchema

printSchema( ): stampa lo schema dell'oggetto sottostante DataFrame.

show

show(num_rows): stampa un numero di righe specificato dall'oggetto sottostante DataFrame.

repartition

repartition(numPartitions): restituisce un nuovo oggetto DynamicFrame con partizioni numPartitions.

coalesce

coalesce(numPartitions) – Restituisce un nuovo oggetto DynamicFrame con partizioni numPartitions.

 — transforms —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Applica una mappatura dichiarativa a DynamicFrame e restituisce un nuovo DynamicFrame con tali mappature applicate ai campi specificati. I campi non specificati vengono omessi dal nuovo DynamicFrame.

  • mappings: un elenco di tuple di mappatura (obbligatorio). Ognuna è costituito da: colonna di origine, tipo di origine, colonna di destinazione, tipo di destinazione.

    Se il nome della colonna di origine include un punto (.), esso deve essere racchiuso tra apici inversi (``). Ad esempio, per mappare this.old.name (stringa) a thisNewName, devi utilizzare la tupla seguente:

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: usa apply_mapping per rinominare campi e modificare tipi di campo

L'esempio di codice seguente mostra come utilizzare il metodo apply_mapping per rinominare i campi selezionati e modificare tipi di campo.

Nota

Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Richiama la trasformazione Classe FlatMap per rimuovere campi da un DynamicFrame. Restituisce un nuovo DynamicFrame con i campi specificati rimossi.

  • paths: un elenco di stringhe. Ognuna contiene il percorso completo di un nodo del campo da rimuovere. Puoi utilizzare la notazione a punti per specificare campi nidificati. Ad esempio, se il campo first è figlio del campo name nell'albero, specifica "name.first" per il percorso.

    Se il nome di un nodo di campo contiene un punto (.) letterale, è necessario racchiudere il nome tra apici inversi (`).

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: utilizza drop_fields per rimuovere campi da un DynamicFrame

Questo esempio di codice utilizza il metodo drop_fields per rimuovere i campi di primo livello e i campi nidificati selezionati da un DynamicFrame.

Set di dati di esempio

L'esempio utilizza il set di dati seguente rappresentato dalla tabella EXAMPLE-FRIENDS-DATA nel codice:

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

Esempio di codice

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

filter

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Restituisce un nuovo DynamicFrame contenente tutti i DynamicRecords nel DynamicFrame di input che soddisfano la funzione predicato specificata f.

  • f: funzione predicato da applicare all'oggetto DynamicFrame. La funzione deve richiedere un DynamicRecord come argomento e restituire True se il DynamicRecord soddisfa i requisiti del filtro o False in caso contrario (obbligatorio).

    Un DynamicRecord rappresenta un record logico all'interno di un DynamicFrame. È simile a una riga in un DataFrame Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: usa il filtro per ottenere una selezione filtrata di campi

In questo esempio viene utilizzato il metodo filter per creare un nuovo DynamicFrame che include una selezione filtrata di campi di un altro DynamicFrame.

Come il metodo map, filter assume una funzione come argomento che viene applicato a ogni record nel DynamicFrame originario. La funzione accetta un record come input e restituisce un valore booleano. Se il valore restituito è vero, il record viene incluso nel DynamicFrame risultante. Se è falso, il record viene escluso.

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Esegue un equi join con un altro DynamicFrame e restituisce il DynamicFrame risultante.

  • paths1: un elenco delle chiavi di questo frame da unire.

  • paths2: un elenco delle chiavi dell'altro frame da unire.

  • frame2: l'altro DynamicFrame da unire.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: usa join per combinare DynamicFrames

Questo esempio utilizza il join metodo per eseguire un'unione su treDynamicFrames. AWS Glue esegue l'unione in base alle chiavi di campo fornite. Il DynamicFrame risultante contiene le righe dei due frame originali in cui le chiavi specificate corrispondono.

Tieni presente che la trasformazione join mantiene intatti tutti i campi. Ciò significa che i campi specificati per la corrispondenza vengono visualizzati nel risultato DynamicFrame, anche se sono ridondanti e contengono le stesse chiavi. In questo esempio viene utilizzato drop_fields per rimuovere tali chiavi ridondanti dopo l'unione.

Nota

Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Restituisce un nuovo DynamicFrame ottenuto applicando la funzione di mappatura specificata a tutti i record nel DynamicFrame originale.

  • f: funzione di mappatura da applicare a tutti i record nell'oggetto DynamicFrame. La funzione deve richiedere un DynamicRecord come argomento e restituire un nuovo DynamicRecord (obbligatorio).

    Un DynamicRecord rappresenta un record logico all'interno di un DynamicFrame. È simile a una riga in un DataFrame Apache Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa associata a errori nella trasformazione (facoltativo).

  • stageThreshold: il numero massimo di errori che si possono verificare nella trasformazione prima che venga arrestata (facoltativo). Il valore di default è zero.

  • totalThreshold: il numero massimo di errori che si possono verificare in totale prima che l'elaborazione venga arrestata (facoltativo). Il valore di default è zero.

Esempio: utilizza la mappa per applicare una funzione a ogni record in un DynamicFrame

In questo esempio viene utilizzato il metodo map per applicare una funzione a ogni record di un DynamicFrame. Nello specifico, questo esempio applica una funzione denominata MergeAddress a ogni record per unire diversi campi indirizzo in un singolo tipo struct.

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

Unisce questo DynamicFrame con un DynamicFrame temporaneo basato sulle chiavi primarie specificate per identificare i record. I record duplicati (record con le stesse chiavi primarie) non vengono deduplicati. Se non è presente alcun record corrispondente nel frame temporaneo, tutti i record (inclusi i duplicati) vengono mantenuti dall'origine. Se il frame di staging dispone di record corrispondenti, i suoi registri sovrascrivono i registri nell'origine in AWS Glue.

  • stage_dynamic_frame: il DynamicFrame di gestione temporanea da unire.

  • primary_keys: l'elenco dei campi chiave primaria per abbinare i record dall'origine e dai frame dinamici di gestione temporanei.

  • transformation_ctx: una stringa univoca utilizzata per recuperare i metadati relativi alla trasformazione corrente (opzionale).

  • options: una stringa di coppie nome-valore JSON che forniscono informazioni aggiuntive per questa trasformazione. Questo argomento non è attualmente utilizzato.

  • info: un String. Qualsiasi stringa da associare agli errori in questa trasformazione.

  • stageThreshold: un Long. Il numero di errori nella trasformazione specificata per cui l'elaborazione deve restituire un errore.

  • totalThreshold: un Long. Il numero totale di errori fino a questa trasformazione inclusa per i quali l'elaborazione deve restituire un errore.

Questo metodo restituisce un nuovo DynamicFrame ottenuto unendo questo DynamicFrame con il DynamicFrame temporaneo.

Il DynamicFrame restituito contiene il record A in questi casi:

  • Se A esiste sia nel frame di origine che nel frame temporaneo, viene restituito A nel frame temporaneo.

  • Se A si trova nella tabella di origine e A.primaryKeys non si trova nel stagingDynamicFrame, A non viene aggiornato nella tabella temporanea.

Il frame di origine e il frame temporaneo non devono avere lo stesso schema.

Esempio: mergeDynamicFrame da utilizzare per unire due in DynamicFrames base a una chiave primaria

Il seguente esempio di codice mostra come utilizzare il metodo mergeDynamicFrame per unire un DynamicFrame con un DynamicFrame di staging, in base alla chiave primaria id.

Set di dati di esempio

L'esempio utilizza due DynamicFrames da una DynamicFrameCollection chiamata split_rows_collection. Di seguito è riportato un elenco di chiavi in split_rows_collection.

dict_keys(['high', 'low'])

Esempio di codice

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

relationalize

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Converte un DynamicFrame in un modulo che si inserisce in un database relazionale. La relazionalizzazione di un DynamicFrame è particolarmente utile quando si desidera spostare dati da un ambiente NoSQL come DynamoDB a un database relazionale come MySQL.

La trasformazione genera un elenco di frame rimuovendo le colonne annidate e ruotando le colonne dell'array. La colonna matrice trasformata mediante pivot può essere unita alla tabella root utilizzando la join-key generata durante la fase di annullamento dell'annidamento.

  • root_table_name: il nome della tabella root.

  • staging_path: il percorso in cui il metodo può archiviare le partizioni di tabelle trasformate mediante pivot in formato CSV (facoltativo). Le tabelle trasformate mediante pivot vengono rilette da questo percorso.

  • options: un dizionario dei parametri opzionali.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: usa la relazionalizzazione per livellare uno schema annidato in un DynamicFrame

Questo esempio di codice utilizza il metodo relationalize per livellare uno schema annidato in una forma adatta a un database relazionale.

Set di dati di esempio

L'esempio utilizza un DynamicFrame chiamato legislators_combined con lo schema seguente. legislators_combined ha più campi annidati come links, images, econtact_details, che verranno livellati dalla trasformazione relationalize.

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

Esempio di codice

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

L'output seguente consente di confrontare lo schema del campo annidato chiamato contact_details con la tabella creata dalla trasformazione relationalize. Si noti che i record della tabella rimandano alla tabella principale utilizzando una chiave esterna chiamata id e una colonna index che rappresenta le posizioni dell'array.

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Rinomina un campo in questo DynamicFrame e restituisce un nuovo DynamicFrame con il campo rinominato.

  • oldName: il percorso completo al nodo che desideri rinominare.

    Se il vecchio nome contiene dei punti, RenameField non funziona a meno che non venga racchiuso tra virgolette (`). Ad esempio, per sostituire this.old.name con thisNewName, chiamare rename_field come segue:

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName: il nuovo nome, come un percorso completo.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: usa rename_field per rinominare i campi in un DynamicFrame

Questo esempio di codice utilizza il metodo rename_field per rinominare i campi in un DynamicFrame. Si noti che l'esempio utilizza il concatenamento di metodi per rinominare più campi contemporaneamente.

Nota

Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.

Esempio di codice

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

Risolve un tipo di scelta all'interno di questo DynamicFrame e restituisce il nuovo DynamicFrame.

  • specs: elenco di ambiguità specifiche da risolvere, ognuna sotto forma di tupla: (field_path, action).

    Ci sono due modi per utilizzare resolveChoice. Il primo consiste nell'indicare un argomento specs per specificare una sequenza di campi specifici e come risolverli. L'altra modalità per resolveChoice è usare un singolo argomento choice per specificare una singola risoluzione per tutti i ChoiceTypes.

    I valori per specs vengono specificati come tuple costituiti da coppie (field_path, action). Il valore field_path identifica un elemento ambiguo specifico e il valore action identifica la soluzione corrispondente. Sono disponibili le operazioni seguenti:

    • cast:type: tenta di trasmettere tutti i valori al tipo specificato. Ad esempio: cast:int.

    • make_cols: converte ogni tipo distinto in colonna con il nome columnName_type. Risolve una potenziale ambiguità livellando i dati. Ad esempio, se columnA fosse un int o una string, la soluzione consisterebbe nel produrre due colonne denominate columnA_int e columnA_string nel DynamicFrame risultante.

    • make_struct: risolve una potenziale ambiguità utilizzando una struct per rappresentare i dati. Ad esempio, se i dati in una colonna sono un int o una string, con l'utilizzo dell'operazione make_struct viene prodotta una colonna di strutture nel risultante DynamicFrame. Ogni struttura contiene sia un int che un string.

    • project:type: risolve una potenziale ambiguità proiettando tutti i dati su uno dei tipi di dati possibili. Ad esempio, se i dati in una colonna sono un int o una string, utilizzando un'operazione project:string viene prodotta una colonna nel DynamicFrame risultante, dove tutti i valori int sono stati convertiti in stringhe.

    Se il field_path identifica un array, inserisci parentesi quadre vuote dopo il nome dell'array per evitare ambiguità. Ad esempio, supponiamo che tu stia lavorando con dati strutturati nel seguente modo:

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    Puoi selezionare la versione numerica invece di quella di stringa del prezzo impostando il field_path su "myList[].price" e la action su "cast:double".

    Nota

    Può essere utilizzato solo uno dei parametri specs e choice. Se il parametro specs non è None, allora il parametro choice deve essere una stringa vuota. Viceversa, se choice non è una stringa vuota, allora il parametro specs deve essere None.

  • choice: specifica una singola risoluzione per tutti i ChoiceTypes. Puoi usare questa modalità nei casi in cui l'elenco completo di ChoiceTypes non è noto prima del runtime. Oltre alle operazioni elencate in precedenza per specs, questa modalità supporta anche l'operazione seguente:

    • match_catalog: tenta di trasmettere ogni ChoiceType al tipo corrispondente nella tabella del catalogo specificata.

  • database: il database del catalogo dati da usare con l'operazione match_catalog.

  • table_name: la tabella del catalogo dati da usare con l'operazione match_catalog.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero di errori riscontrati fino a questa trasformazione compresa, raggiunto il quale il processo dovrebbe interrompersi (opzionale: impostazione predefinita: zero, a indicare che il processo non dovrebbe interrompersi).

  • catalog_id: l'ID catalogo del catalogo dati a cui si accede (l'ID account del catalogo dati). Se impostato su None (valore predefinito), utilizza l'ID catalogo dell'account chiamante.

Esempio: utilizzare resolveChoice per gestire una colonna che contiene più tipi

Questo esempio di codice utilizza il metodo resolveChoice per specificare come gestire una colonna DynamicFrame che contiene valori di più tipi. L'esempio mostra due modi comuni per gestire una colonna con tipi diversi:

  • Trasforma la colonna in un singolo tipo di dati.

  • Conserva tutti i tipi in colonne separate.

Set di dati di esempio

L'esempio utilizza un DynamicFrame chiamato medicare con il seguente schema:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Esempio di codice

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Restituisce un nuovo DynamicFrame contenente i campi selezionati.

  • paths: un elenco di stringhe. Ogni stringa è un percorso completo di un nodo di livello superiore da selezionare.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: usa select_fields per creare un nuovo DynamicFrame con i campi scelti

L'esempio di codice seguente mostra come utilizzare il metodo select_fields per creare un nuovo DynamicFrame con un elenco di campi scelto da un DynamicFrame esistente.

Nota

Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

simply_ddb_json

simplify_ddb_json(): DynamicFrame

Semplifica le colonne annidate in un ambiente DynamicFrame che si trova specificamente nella struttura JSON di DynamoDB e ne restituisce una nuova semplificata. DynamicFrame Se ci sono più tipi o tipi di mappa in un tipo di elenco, gli elementi nell'elenco non verranno semplificati. Tieni presente che si tratta di un tipo specifico di trasformazione che si comporta in modo diverso dalla unnest trasformazione normale e richiede che i dati siano già presenti nella struttura JSON di DynamoDB. Per ulteriori informazioni, consulta DynamoDB JSON.

Ad esempio, lo schema di lettura di un'esportazione con la struttura JSON DynamoDB potrebbe apparire come segue:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

La trasformazione di simplify_ddb_json() lo convertirebbe in:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Esempio: utilizza simply_ddb_json per richiamare un DynamoDB JSON simple

Questo esempio di codice utilizza il simplify_ddb_json metodo per utilizzare il connettore di esportazione AWS Glue DynamoDB, richiamare un DynamoDB JSON simple e stampare il numero di partizioni.

Esempio di codice

from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())

spigot

spigot(path, options={})

Scrive record di esempio in una destinazione specificata per aiutarti a verificare le trasformazioni eseguite dal tuo lavoro.

  • path: il percorso della destinazione in cui scrivere (obbligatorio).

  • options: coppie chiave-valore che specificano opzioni (opzionale). L'opzione "topk" specifica che devono essere scritti i primi record k. L'opzione "prob" specifica la probabilità (sotto forma di valore decimale) di scelta di un dato record. Puoi usarlo per selezionare i record da scrivere.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

Esempio: usa lo spigot per scrivere campi di esempio da DynamicFrame ad Amazon S3

Questo esempio di codice utilizza il metodo spigot per scrivere record di esempio in un bucket Amazon S3 dopo aver applicato la trasformazione select_fields.

Set di dati di esempio

Nota

Per accedere al set di dati utilizzato in questo esempio, consulta Esempio di codice: unione e relazioni dei dati e segui le istruzioni in Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3.

L'esempio utilizza un DynamicFrame chiamato persons con il seguente schema:

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

Esempio di codice

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

Di seguito è riportato un esempio di dati che spigot scrive su Amazon S3. Poiché è stato specificato il codice di esempio options={"topk": 10}, i dati di esempio contengono i primi 10 record.

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Restituisce un nuovo DynamicFrameCollection che ne contiene due DynamicFrames. Il primo DynamicFrame contiene tutti i nodi che sono stati separati e il secondo contiene i nodi rimanenti.

  • paths: elenco di stringhe, ciascuna delle quali è un percorso completo di un nodo da separare in un nuovo oggetto DynamicFrame.

  • name1: una stringa nome per il DynamicFrame separato.

  • name2: una stringa nome per il DynamicFrame che rimane dopo aver separato i nodi specificati.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: usare split_fields per dividere i campi selezionati in un campo separato DynamicFrame

Questo esempio di codice utilizza il metodo split_fields per dividere un elenco di campi specificati in un campo separato DynamicFrame.

Set di dati di esempio

L'esempio utilizza un DynamicFrame chiamato l_root_contact_details che proviene da una raccolta denominata legislators_relationalized.

l_root_contact_details ha il seguente schema e le seguenti voci.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

Esempio di codice

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Suddivide una o più righe di un DynamicFrame in un nuovo DynamicFrame.

Il metodo restituisce un nuovo DynamicFrameCollection che contiene due DynamicFrames. Il primo DynamicFrame contiene tutti i nodi che sono stati separati e il secondo contiene i nodi rimanenti.

  • comparison_dict: un dizionario in cui la chiave è un percorso verso una colonna e il valore è un altro dizionario per la mappatura di comparatori rispetto a valori con i quali vengono confrontati i valori di colonna. Ad esempio, {"age": {">": 10, "<": 20}} divide tutte le righe il cui valore nella colonna età è superiore a 10 e inferiore a 20.

  • name1: una stringa nome per il DynamicFrame separato.

  • name2: una stringa nome per il DynamicFrame che rimane dopo aver separato i nodi specificati.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: usare split_rows per dividere le righe in un DynamicFrame

Questo esempio di codice utilizza il metodo split_rows per dividere le righe in un DynamicFrame in base al valore del campo id.

Set di dati di esempio

L'esempio utilizza un DynamicFrame chiamato l_root_contact_details che proviene da una raccolta denominata legislators_relationalized.

l_root_contact_details ha il seguente schema e le seguenti voci.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

Esempio di codice

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

Esegue la conversione unboxing di un campo stringa in un oggetto DynamicFrame e restituisce un nuovo oggetto DynamicFrame che contiene gli oggetti DynamicRecords sottoposti a conversione unboxing.

Un DynamicRecord rappresenta un record logico all'interno di un DynamicFrame. È simile a una riga in un DataFrame Apache Spark, con la differenza che è autodescrittivo e può essere utilizzato per dati non conformi a uno schema fisso.

  • path: un percorso completo al nodo stringa che desideri cancellare.

  • format: una specifica del formato (facoltativa). Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • options: una o più delle seguenti:

    • separator: una stringa che contiene il carattere separatore.

    • escaper: una stringa che contiene il carattere escape.

    • skipFirst: un valore Boolean che indica se saltare la prima istanza.

    • withSchema: una stringa contenente una rappresentazione JSON dello schema del nodo. Il formato della rappresentazione JSON di uno schema è definito dall'output di StructType.json().

    • withHeader: un valore Boolean che indica se è inclusa un'intestazione.

Esempio: usare unbox per decomprimere un campo di stringa in un campo struct

Questo esempio di codice utilizza il metodo unbox per decomprimere o riformattare un campo di tipo stringa DynamicFrame in un campo di tipo struct.

Set di dati di esempio

L'esempio utilizza un DynamicFrame chiamato mapped_with_string con i seguenti schema e voci:

Nota il campo denominato AddressString. Questo è il campo che di cui l'esempio esegue l'unboxing in un campo struct.

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

Esempio di codice

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

unione

union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)

Unione due. DynamicFrames Restituzioni DynamicFrame contenenti tutti i record di entrambi gli input DynamicFrames. Questa trasformazione può restituire risultati diversi dall'unione di due DataFrames con dati equivalenti. Se hai bisogno del comportamento dell' DataFrame unione Spark, prendi in considerazione l'utilizzo toDF di.

  • frame1— I primi DynamicFrame a unirsi.

  • frame2— Seconda dopo DynamicFrame l'unione.

  • transformation_ctx: (facoltativo) una stringa univoca utilizzata per identificare informazioni su statistiche/stato

  • info: (facoltativo) qualsiasi stringa da associare agli errori nella trasformazione

  • stageThreshold: (facoltativo) numero massimo di errori nella trasformazione fino a che l'elaborazione si interrompe a causa di un errore

  • totalThreshold: (facoltativo) numero massimo di errori totali fino a che l'elaborazione si interrompe a causa di un errore.

unnest

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Annulla l'annidamento di oggetti nidificati in un DynamicFrame rendendoli oggetti di primo livello e restituendo un nuovo DynamicFrame non nidificato.

  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

  • totalThreshold: il numero massimo di errori rilevati durante questa trasformazione raggiunto il quale il processo deve interrompersi (facoltativo). L'impostazione predefinita è zero, indicando che il processo non deve terminare.

Esempio: usare unnest per trasformare i campi annidati in campi di primo livello

Questo esempio di codice utilizza il metodo unnest per raggruppare tutti i campi annidati di aDynamicFrame in campi di primo livello.

Set di dati di esempio

L'esempio utilizza un DynamicFrame chiamato mapped_medicare con il seguente schema. Nota che il campo Address è l'unico campo che contiene dati annidati.

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

Esempio di codice

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

Snidifica le colonne nidificate in un DynamicFrame che si trovano specificamente nella struttura JSON di DynamoDB e restituisce un nuovo DynamicFrame non annidato. Le colonne che sono di un array di struct non verranno annidate. Si noti che si tratta di un tipo specifico di trasformazione di snidamento che si comporta in modo diverso dalla normale trasformazione di unnest e richiede che i dati siano già nella struttura JSON di DynamoDB. Per ulteriori informazioni, consulta DynamoDB JSON.

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
  • transformation_ctx: una stringa univoca utilizzata per identificare informazioni sullo stato (opzionale).

  • info: una stringa da associare alla segnalazione errori per questa trasformazione (opzionale).

  • stageThreshold: il numero di errori riscontrati durante questa trasformazione, raggiunto il quale il processo dovrebbe interrompersi (opzionale impostazione predefinita: zero, a indicare che il processo non dovrebbe interrompersi).

  • totalThreshold: il numero di errori riscontrati fino a questa trasformazione compresa, raggiunto il quale il processo dovrebbe interrompersi (opzionale: impostazione predefinita: zero, a indicare che il processo non dovrebbe interrompersi).

Ad esempio, lo schema di lettura di un'esportazione con la struttura JSON DynamoDB potrebbe apparire come segue:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

La trasformazione di unnest_ddb_json() lo convertirebbe in:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

L'esempio di codice seguente mostra come utilizzare il connettore di esportazione AWS Glue DynamoDB, richiamare un unnest JSON di DynamoDB e stampare il numero di partizioni:

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

write

write(connection_type, connection_options, format, format_options, accumulator_size)

Ottiene un DataSink(object) del tipo di connessione specificata da GlueContext classe di questo DynamicFrame e lo utilizza per formattare e scrivere i contenuti di questo DynamicFrame. Restituisce il nuovo DynamicFrame formattato e scritto come specificato.

  • connection_type: il tipo di connessione da utilizzare. I valori validi sono s3, mysql, postgresql, redshift, sqlserver e oracle.

  • connection_options: l'opzione di connessione da utilizzare (opzionale). Per un connection_type di s3 è definito un percorso Amazon S3.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    Per le connessioni JDBC, diverse proprietà devono essere definite. Il nome del database deve fare parte dell'URL. Puoi opzionalmente essere incluso nelle opzioni di connessione.

    avvertimento

    Si consiglia di non archiviare le password nello script. Valuta la possibilità boto3 di utilizzarli per recuperarli da AWS Secrets Manager o dal AWS Glue Data Catalog.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format: una specifica del formato (facoltativa). Viene utilizzato per un servizio Amazon Simple Storage Service (Amazon S3) o una connessione AWS Glue che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • format_options: opzioni di formato per il formato specificato. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.

  • accumulator_size: la dimensione accumulabile da utilizzare, in byte (facoltativa).

 — errori —

assertErrorThreshold

assertErrorThreshold( ): asserzione per gli errori nelle trasformazioni che hanno creato questo oggetto DynamicFrame. Restituisce una Exception dal DataFrame sottostante.

errorsAsDynamicCornice

errorsAsDynamicFrame( ): restituisce un DynamicFrame che ha record di errore nidificati al suo interno.

Esempio: utilizzare errorsAsDynamic Frame per visualizzare i record di errori

L'esempio di codice seguente mostra come utilizzare il metodo errorsAsDynamicFrame per visualizzare un record degli errori per un DynamicFrame.

Set di dati di esempio

L'esempio utilizza il set di dati seguente che puoi caricare in Amazon S3 come JSON. Tieni presente che il formato del secondo record non è corretto. I dati con formato non corretto generalmente interrompono l'analisi dei file quando utilizzi SparkSQL. DynamicFrame, tuttavia, riconosce i problemi di formato non corretto e trasforma le righe con formato non corretto in record degli errori che puoi gestire singolarmente.

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

Esempio di codice

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ): restituisce il numero totale di errori in un oggetto DynamicFrame.

stageErrorsCount

stageErrorsCount: restituisce il numero di errori che si sono verificati nel processo di generazione di questo oggetto DynamicFrame.