Clase DynamicFrame
Una de las principales abstracciones de Apache Spark es el DataFrame
de SparkSQL, que es similar a la construcción DataFrame
que se encuentra en R y en Pandas. Un elemento DataFrame
es similar a una tabla y admite operaciones de estilo funcional (map/reduce/filter/etc.) y operaciones SQL (select, project, aggregate).
DataFrames
son eficaces y de uso general, pero tienen limitaciones en las operaciones de extracción, transformación y carga (ETL). Más aún, es preciso especificar un esquema antes de cargar cualquier dato. SparkSQL aborda esta cuestión al repasar dos veces los datos: la primera para deducir el esquema y la segunda para cargar los datos. Sin embargo, esta deducción es limitada y no se ocupa de la realidad de los datos confusos. Por ejemplo, el mismo campo puede ser de un tipo diferente en diferentes registros. Apache Spark a menudo renuncia e informa del tipo como si fuera una string
mediante el texto del campo original. Esto podría no ser correcto y puede que desee controlar mejor cómo se resuelven las discrepancias en el esquema. Y en cuanto a grandes conjuntos de datos, un paso adicional por los datos de origen podría tener un precio prohibitivo.
Para abordar estas limitaciones, AWS Glue introduce DynamicFrame
. DynamicFrame
es similar a DataFrame
, salvo que cada registro se describe a sí mismo, por lo que no es preciso tener un esquema inicial. En su lugar, AWS Glue procesa un esquema sobre la marcha cuando es necesario y codifica explícitamente las incoherencias de esquema mediante un tipo de elección (o unión). Puede resolver estas incoherencias para que los conjuntos de datos pasen a ser compatibles con almacenes de datos que requieren un esquema fijo.
Asimismo, un DynamicRecord
representa un registro lógico dentro de un DynamicFrame
. Es como una fila en un DataFrame
de Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo. Cuando se utiliza Glue AWS con PySpark, no suele manipular de forma independiente DynamicRecords
. Más bien, transformará el conjunto de datos en conjunto a través de DynamicFrame
.
Puede convertir DynamicFrames
en DataFrames
y viceversa, una vez que haya resuelto las incoherencias de esquema.
— construcción —
__init__
__init__(jdf, glue_ctx, name)
-
jdf
: referencia a la trama de datos de la máquina virtual de Java (JVM). -
glue_ctx
: un objeto Clase GlueContext. -
name
: cadena de nombre opcional, vacía de forma predeterminada.
fromDF
fromDF(dataframe, glue_ctx, name)
Convierte un DataFrame
en un DynamicFrame
al convertir campos de DataFrame
en campos de DynamicRecord
. Devuelve el nuevo DynamicFrame
.
Un DynamicRecord
representa un registro lógico en un DynamicFrame
. Es similar a una fila en un DataFrame
de Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo.
Esta función espera que las columnas con nombres duplicados en el DataFrame
ya se hayan resuelto.
-
dataframe
:DataFrame
de Apache Spark SQL que debe convertirse (obligatorio). -
glue_ctx
: objeto Clase GlueContext que especifica el contexto de esta transformación (obligatorio). -
name
: el nombre deDynamicFrame
que resulte (opcional desde AWS Glue 3.0).
toDF
toDF(options)
Convierte un DynamicFrame
en un DataFrame
de Apache Spark convirtiendo campos de DynamicRecords
en campos de DataFrame
. Devuelve el nuevo DataFrame
.
Un DynamicRecord
representa un registro lógico en un DynamicFrame
. Es similar a una fila en un DataFrame
de Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo.
-
options
: lista de opciones. Permite que especifique opciones adicionales para el proceso de conversión. Algunas opciones válidas que puede utilizar con el parámetro `options`:-
format
: especifica el formato de los datos, como json, csv, parquet. -
separater or sep
: en el caso de los archivos CSV, especifica el delimitador. -
header
: en el caso de los archivos CSV, indica si la primera fila es un encabezado (verdadero/falso). -
inferSchema
: indica a Spark que deduzca el esquema automáticamente (verdadero/falso).
A continuación, se muestra un ejemplo del uso del parámetro `options` con el método `toDF`:
from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) csv_dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://my-bucket/path/to/csv/"]}, format="csv" ) csv_cf = csv_dyf.toDF(options={ "separator": ",", "header": "true", "ïnferSchema": "true" })
Especifique el tipo de destino si ha elegido el tipo de acción
Project
yCast
. Algunos ejemplos son los siguientes:>>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])
-
— información —
count
count( )
: devuelve el número de filas del DataFrame
subyacente.
Esquema
schema( )
- Muestra el esquema de este DynamicFrame
o en caso de que no esté disponible, el esquema del elemento DataFrame
subyacente.
Para obtener más información acerca de los tipos de DynamicFrame
que componen este esquema, consulte Tipos de extensión PySpark.
printSchema
printSchema( )
: imprime el esquema del DataFrame
subyacente.
show
show(num_rows)
: imprime un número especificado de filas del DataFrame
subyacente.
repartition
repartition(numPartitions)
: devuelve un DynamicFrame
nuevo con particiones numPartitions
.
coalesce
coalesce(numPartitions)
: devuelve un DynamicFrame
nuevo con particiones numPartitions
.
— transformaciones —
apply_mapping
apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Aplica una asignación declarativa a un DynamicFrame
y devuelve un DynamicFrame
nuevo con esas asignaciones aplicadas a los campos que se especifican. Los campos no especificados se omiten en el DynamicFrame
nuevo.
-
mappings
: una lista de tuplas de asignación (obligatorio). Cada una consta de (columna de origen, tipo de origen, columna de destino, tipo de destino).Si la columna de origen tiene un punto “
.
” en el nombre, se deben colocar comillas simples “``
” antes y después. Por ejemplo, para asignarthis.old.name
(cadena) athisNewName
, debe utilizar las siguientes tuplas:("`this.old.name`", "string", "thisNewName", "string")
-
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: uso de apply_mapping para cambiar el nombre de los campos y cambiar los tipos de campos
En el siguiente ejemplo de código, se muestra cómo utilizar el método apply_mapping
para cambiar el nombre de los campos seleccionados y cambiar los tipos de campos.
nota
Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.
# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date
drop_fields
drop_fields(paths, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Llama a la transformación Clase FlatMap para eliminar campos de un elemento DynamicFrame
. Devuelve un DynamicFrame
nuevo con los campos especificados anulados.
-
paths
: una lista de cadenas. Cada una contiene la ruta completa a un nodo de campo que desea descartar. Puede utilizar la notación de puntos para especificar campos anidados. Por ejemplo, si el campofirst
es un elemento secundario del camponame
en el árbol, especificará"name.first"
para la ruta.Si el nombre de un nodo de campo tiene un literal
.
, debe escribir el nombre entre tildes graves (`
). -
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: uso de drop_fields para descartar campos de un DynamicFrame
En este ejemplo de código se utiliza el método drop_fields
para eliminar los campos anidados y de nivel superior seleccionados de un DynamicFrame
.
Conjunto de datos de ejemplo
El ejemplo utiliza el siguiente conjunto de datos que está representado por la tabla EXAMPLE-FRIENDS-DATA
en el código:
{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}
Código de ejemplo
# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "
MY-EXAMPLE-DATABASE
" glue_source_table = "EXAMPLE-FRIENDS-DATA
" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string
filtro
filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Devuelve un DynamicFrame
nuevo que contiene todos los DynamicRecords
dentro de la entrada DynamicFrame
que cumplen la función f
de predicado especificada.
-
f
: función de predicado que debe aplicarse al elementoDynamicFrame
. La función debe tomar unDynamicRecord
como argumento y devolver True siDynamicRecord
cumple los requisitos de filtro o False si no los cumple (obligatorio).Un
DynamicRecord
representa un registro lógico en unDynamicFrame
. Es similar a una fila en unDataFrame
de Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo. -
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: uso de filter para obtener una selección filtrada de campos
En este ejemplo se utiliza el método filter
para crear un DynamicFrame
nuevo que incluye una selección filtrada de campos de otro DynamicFrame
.
Al igual que el método map
, filter
toma una función como un argumento que se aplica a cada registro en el DynamicFrame
original. La función toma un registro como una entrada y devuelve un valor booleano. Si el valor que se devuelve es verdadero, el registro se incluye en el DynamicFrame
resultante. Si es falso, el registro se omite.
nota
Para acceder al conjunto de datos utilizado en este ejemplo, consulte Ejemplo de código: Preparación de datos con ResolveChoice, Lambda y ApplyMapping y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.
# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564
unirse
join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Realiza una unión de igualdad con otro elemento DynamicFrame
y devuelve el elemento DynamicFrame
resultante.
-
paths1
: lista de las claves de esta trama que deben unirse. -
paths2
: lista de las claves de la otra trama que deben unirse. -
frame2
: el otro elementoDynamicFrame
que debe unirse. -
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: uso de join para combinar DynamicFrames
En este ejemplo se utiliza el método join
para realizar una unión en tres DynamicFrames
. AWS Glue realiza la unión en función de las claves de campos que se proporcionan. El DynamicFrame
resultante contiene filas de los dos marcos originales donde coinciden las claves especificadas.
Tenga en cuenta que la transformación join
mantiene todos los campos intactos. Esto significa que los campos que se especifican para que coincidan aparecen en el DynamicFrame resultante, incluso si son redundantes y contienen las mismas claves. En este ejemplo, utilizamos drop_fields
para eliminar estas claves redundantes después de la unión.
nota
Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.
# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
map
map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Devuelve un elemento DynamicFrame
nuevo que se obtiene al aplicar la función de mapeo especificada a todos los registros del elemento DynamicFrame
original.
-
f
: función de asignación que debe aplicarse a todos los registros del elementoDynamicFrame
. La función debe tomar un elementoDynamicRecord
como argumento y devolver unDynamicRecord
nuevo (obligatorio).Un
DynamicRecord
representa un registro lógico en unDynamicFrame
. Es similar a una fila en unDataFrame
de Apache Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo. transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional).info
: cadena que está asociada con errores en la transformación (opcional).stageThreshold
: cantidad máxima de errores que se puede producir en la transformación antes de que se determine que es errónea (opcional). El rol predeterminado es cero.totalThreshold
: cantidad máxima de errores que se pueden producir en total antes de que se determine que el proceso es erróneo (opcional). El rol predeterminado es cero.
Ejemplo: uso de map para aplicar una función a cada registro en un DynamicFrame
En este ejemplo se muestra cómo utilizar el método map
para aplicar una función a cada registro de un DynamicFrame
. En concreto, este ejemplo aplica una función llamada MergeAddress
a cada registro para combinar varios campos de dirección en un solo tipo de struct
.
nota
Para acceder al conjunto de datos utilizado en este ejemplo, consulte Ejemplo de código: Preparación de datos con ResolveChoice, Lambda y ApplyMapping y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.
# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
mergeDynamicFrame
mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "",
options = {}, info = "", stageThreshold = 0, totalThreshold = 0)
Combina este objeto DynamicFrame
con una instancia provisional de DynamicFrame
en función de las claves principales especificadas para identificar registros. Los registros duplicados (registros con las mismas claves principales) no se desduplican. Si no hay ningún registro que coincida en el marco provisional, se retienen todos los registros del origen (incluidos los duplicados). Si el marco provisional tiene registros coincidentes, estos sobrescriben a los registros del origen en AWS Glue.
-
stage_dynamic_frame
:DynamicFrame
provisional que se fusionará. -
primary_keys
: la lista de campos de clave principal para hacer coincidir los registros de los marcos dinámicos de origen y provisionales. -
transformation_ctx
: cadena única que se utiliza para recuperar metadatos sobre la transformación actual (opcional). -
options
: una cadena de pares de nombre-valor de JSON que proporcionan información adicional para esta transformación. Este argumento no se utiliza actualmente. -
info
: un elementoString
. Cualquier cadena que se va a asociar con los errores de esta transformación. -
stageThreshold
: un elementoLong
. Número de errores de la transformación especificada que provocarán que el proceso se termine. -
totalThreshold
: un elementoLong
. Número total de errores hasta esta transformación (incluida) que provocarán que el proceso se termine.
Este método devuelve un nuevo objeto DynamicFrame
que se obtiene combinando esta instancia de DynamicFrame
con la instancia de DynamicFrame
provisional.
El objeto DynamicFrame
devuelto contiene el registro A en estos casos:
-
Si
A
existe tanto en el marco de origen como en el marco provisional, se devuelveA
en el marco provisional. -
Si
A
está en la tabla de origen yA.primaryKeys
no está enstagingDynamicFrame
A
no se actualiza en la tabla provisional.
No es necesario que el marco de origen y el marco provisional tengan el mismo esquema.
Ejemplo: utilice mergeDynamicFrame para combinar dos DynamicFrames
en función de una clave principal
El siguiente ejemplo de código muestra cómo utilizar el método mergeDynamicFrame
para combinar un DynamicFrame
con un DynamicFrame
de “puesta en escena”, en función del id
de la clave principal.
Conjunto de datos de ejemplo
El ejemplo usa dos DynamicFrames
de una DynamicFrameCollection
llamada split_rows_collection
. A continuación se ofrece una lista de las claves de split_rows_collection
.
dict_keys(['high', 'low'])
Código de ejemplo
# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+
relationalize
relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Convierte un DynamicFrame
en un formulario que cabe en una base de datos relacional. Relacionalizar un DynamicFrame
es especialmente útil cuando se desean mover datos de un entorno NoSQL como DynamoDB a una base de datos relacional como MySQL.
La transformación genera una lista de marcos al aplanar columnas anidadas y dinamizar las columnas de la matriz. Las columnas de matriz dinamizada se pueden unir a la tabla raíz con la clave de combinación generada durante la fase de desanidado.
root_table_name
: nombre de la tabla raíz.staging_path
: ruta en la que el método puede almacenar las particiones de las tablas dinamizadas en formato CSV (opcional). Las tablas dinamizadas se leen desde esta ruta.options
: diccionario de parámetros opcionales.-
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: utilice relacionalizar para aplanar un esquema anidado en un DynamicFrame
Este ejemplo de código utiliza el método relationalize
para aplanar un esquema anidado en un formulario que encaje en una base de datos relacional.
Conjunto de datos de ejemplo
El ejemplo usa un DynamicFrame
llamado legislators_combined
con el siguiente esquema. legislators_combined
tiene varios campos anidados como links
, images
y contact_details
, que se aplanarán con la transformación relationalize
.
root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
Código de ejemplo
# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "
s3://DOC-EXAMPLE-BUCKET/tmpDir
" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()
El siguiente resultado permite comparar el esquema del campo anidado llamado contact_details
a la tabla que creó la transformación relationalize
. Observe que los registros de la tabla se vinculan a la tabla principal mediante una clave externa llamada id
y una columna index
que representa las posiciones de la matriz.
dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+
rename_field
rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Cambia el nombre de un campo en este DynamicFrame
y devuelve un DynamicFrame
nuevo con el campo con el nombre nuevo.
-
oldName
: la ruta completa al nodo cuyo nombre desea cambiar.Si el nombre antiguo tiene puntos,
RenameField
no funcionará a menos que lo ponga entre acentos graves (`
). Por ejemplo, para reemplazarthis.old.name
porthisNewName
, debería llamar a rename_field tal y como se indica a continuación:newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
-
newName
: el nombre nuevo, indicado como una ruta completa. -
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: utilice rename_field para cambiar el nombre de los campos de un DynamicFrame
Este ejemplo de código utiliza el método rename_field
para cambiar el nombre de los campos de un DynamicFrame
. Observe que en el ejemplo se utiliza el encadenamiento de métodos para renombrar varios campos al mismo tiempo.
nota
Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.
Código de ejemplo
# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string
resolveChoice
resolveChoice(specs = None, choice = "" , database = None , table_name = None ,
transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id =
None)
Resuelve un tipo de elección dentro de este elemento DynamicFrame
y devuelve el DynamicFrame
nuevo.
-
specs
: lista de ambigüedades concretas que deben resolverse, cada una en forma de tupla:(field_path, action)
.Hay dos formas de usar
resolveChoice
. La primera consiste en utilizar el argumentospecs
para especificar una secuencia de campos específicos y cómo resolverlos. El otro modo deresolveChoice
es utilizar el argumentochoice
para especificar una resolución única para todos losChoiceTypes
.Valores de
specs
se especifican como tuplas compuestas de pares(field_path, action)
. El valor defield_path
identifica un elemento ambiguo concreto, mientras que el valor deaction
identifica la resolución correspondiente. Los posibles valores son los siguientes:-
cast:
: intenta convertir todos los valores al tipo especificado. Por ejemplo:type
cast:int
. -
make_cols
: convierte cada tipo diferenciado en una columna con el nombre
. Resuelve una posible ambigüedad al aplanar los datos. Por ejemplo, sicolumnName
_type
columnA
puede ser tanto unint
como unastring
, la resolución puede consistir en producir dos columnas llamadascolumnA_int
ycolumnA_string
en el elementoDynamicFrame
resultante. -
make_struct
: resuelve una posible ambigüedad al utilizar unastruct
para representar los datos. Por ejemplo, si los datos de una columna pueden ser tanto unint
como unastring
, la acciónmake_struct
generará una columna de estructuras en el elementoDynamicFrame
resultante. Cada estructura contiene tanto unint
como unastring
. -
project:
: resuelve una posible ambigüedad al proyectar todos los datos a uno de los tipos de datos posibles. Por ejemplo, si los datos de una columna pueden ser tanto untype
int
como unastring
, ejecutar la acciónproject:string
genera una columna en el elementoDynamicFrame
resultante, en la que todos los valoresint
se han convertido en cadenas.
Si en
field_path
se identifica una matriz, incluya corchetes vacíos después del nombre de la matriz para evitar ambigüedades. Por ejemplo, suponga que está trabajando con datos estructurados tal y como se indica a continuación:"myList": [ { "price": 100.00 }, { "price": "$100.00" } ]
Puede seleccionar la versión numérica en vez de la de cadena del precio si configura
field_path
en"myList[].price"
yaction
en"cast:double"
.nota
Solo se puede utilizar uno, el parámetro
specs
o el parámetrochoice
. Si el parámetrospecs
no esNone
, el parámetrochoice
tiene que ser una cadena vacía. Y viceversa, si el parámetrochoice
no es una cadena vacía, el parámetrospecs
tiene que serNone
. -
choice
: especifica una resolución única para todos losChoiceTypes
. Se puede utilizar en los casos en los que se desconozca la lista completa deChoiceTypes
antes del tiempo de ejecución. Además de las acciones enumeradas anteriormente paraspecs
, este argumento también soporta la siguiente acción:-
match_catalog
: intenta convertir cadaChoiceType
al tipo correspondiente en la tabla de Data Catalog especificada.
-
-
database
: la base de datos de Data Catalog que se usará con la acciónmatch_catalog
. -
table_name
: la tabla de Data Catalog que se usará con la acciónmatch_catalog
. -
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: número de errores detectados hasta este proceso de transformación incluido que provocarán que el proceso se termine (opcional). Es cero de forma predeterminada, lo que indica que el proceso no debería terminarse por error. -
catalog_id
: el ID de catálogo de Data Catalog al que se accede (el ID de cuenta de Data Catalog). Cuando se establece enNone
(valor predeterminado), utiliza el ID de catálogo de la cuenta que hace la llamada.
Ejemplo: utilice resolveChoice para gestionar una columna que contiene varios tipos
Este ejemplo de código utiliza el método resolveChoice
para especificar cómo gestionar una columna DynamicFrame
que contiene valores de varios tipos. El ejemplo muestra dos formas comunes de gestionar una columna con diferentes tipos:
Convierta la columna en un solo tipo de datos.
Conserve todos los tipos en columnas separadas.
Conjunto de datos de ejemplo
nota
Para acceder al conjunto de datos utilizado en este ejemplo, consulte Ejemplo de código: Preparación de datos con ResolveChoice, Lambda y ApplyMapping y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.
El ejemplo usa un DynamicFrame
llamado medicare
con el siguiente esquema:
root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string
Código de ejemplo
# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows
select_fields
select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Devuelve un DynamicFrame
nuevo que contiene los campos seleccionados.
-
paths
: una lista de cadenas. Cada cadena es una ruta a un nodo de nivel superior que desea seleccionar. -
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: uso de select_fields para crear un DynamicFrame
nuevo con los campos elegidos
En el siguiente ejemplo de código, se muestra cómo utilizar el método select_fields
para crear un DynamicFrame
nuevo con una lista seleccionada de campos de un DynamicFrame
existente.
nota
Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.
# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows
simplify_ddb_json
simplify_ddb_json(): DynamicFrame
Simplifica las columnas anidadas en un DynamicFrame
que se encuentran de manera específica en la estructura de JSON de DynamoDB y devuelve un nuevo DynamicFrame
simplificado. Si hay varios tipos y tipos de mapas una lista de tipos, no se simplificarán los elementos de la lista. Tenga en cuenta que este es un tipo específico de transformación que se comporta de manera distinta a la transformación unnest
regular y que necesita que los datos ya se encuentren en la estructura JSON de DynamoDB. Para obtener más información, consulte JSON de DynamoDB.
Por ejemplo, el esquema de una lectura de una exportación con la estructura JSON de DynamoDB puede parecerse al siguiente:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
La transformación simplify_ddb_json()
convertiría esto en:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Por ejemplo: utilice simplify_ddb_json para invocar una simplificación JSON de DynamoDB.
Este ejemplo de código utiliza el método simplify_ddb_json
para utilizar el conecto de exportación de DynamoDB de AWS Glue, invocar una simplificación JSON de DynamoDB e imprimir el número de pariciones.
Código de ejemplo
from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())
spigot
spigot(path, options={})
Escribe registros de ejemplo en un destino específico para ayudarlo a verificar las transformaciones realizadas por su trabajo.
-
path
: ruta del destino en el que se escribirá (obligatorio). -
options
: pares valor-clave que especifican opciones (opcional). La opción"topk"
especifica que deben escribirse los primeros registrosk
. La opción"prob"
especifica la probabilidad de que se elija un registro determinado (como decimal). Puede usarlo para seleccionar registros para escribir. transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional).
Ejemplo: utilice spigot para escribir campos de ejemplo desde un DynamicFrame
en Amazon S3
Este ejemplo de código utiliza el método spigot
para escribir registros de ejemplo en un bucket de Amazon S3 después de aplicar la transformación select_fields
.
Conjunto de datos de ejemplo
nota
Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.
El ejemplo usa un DynamicFrame
llamado persons
con el siguiente esquema:
root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string
Código de ejemplo
# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="
s3://DOC-EXAMPLE-BUCKET
", options={"topk": 10} )
A continuación, se muestra un ejemplo de los datos que spigot
escribe en Amazon S3. Como se especifica el código de ejemplo options={"topk": 10}
, los datos de la muestra contienen los 10 primeros registros.
{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}
split_fields
split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Devuelve una nueva DynamicFrameCollection
que contiene dos DynamicFrames
. El primer DynamicFrame
contiene todos los nodos que se han dividido y el segundo contiene los nodos que quedan.
-
paths
: lista de cadenas, cada una de las cuales es una ruta completa a un nodo que se desea dividir en un elementoDynamicFrame
nuevo. -
name1
: cadena de nombre para elDynamicFrame
que se divide. -
name2
: cadena de nombre para el elementoDynamicFrame
que queda después de que se hayan dividido los nodos especificados. -
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: utilice split_fields para dividir los campos seleccionados en un campo DynamicFrame
independiente
Este ejemplo de código usa el método split_fields
para dividir una lista de campos especificados en una lista DynamicFrame
independiente.
Conjunto de datos de ejemplo
El ejemplo usa un DynamicFrame
llamado l_root_contact_details
que proviene de una colección denominada legislators_relationalized
.
l_root_contact_details
tiene el siguiente esquema y entradas.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...
Código de ejemplo
# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string
split_rows
split_rows(comparison_dict, name1, name2, transformation_ctx="", info="",
stageThreshold=0, totalThreshold=0)
Divide una o más filas de un DynamicFrame
en un nuevo DynamicFrame
.
El método devuelve un nuevo DynamicFrameCollection
que contiene dos DynamicFrames
. El primer DynamicFrame
contiene todas las filas que se han dividido y el segundo contiene las filas que quedan.
-
comparison_dict
: diccionario donde la clave es una ruta a una columna y el valor es otro diccionario que mapea comparadores con valores con los que se compara el valor de columna. Por ejemplo,{"age": {">": 10, "<": 20}}
divide todas las filas cuyo valor en la columna antigüedad sea superior a 10 e inferior a 20. -
name1
: cadena de nombre para elDynamicFrame
que se divide. -
name2
: cadena de nombre para el elementoDynamicFrame
que queda después de que se hayan dividido los nodos especificados. -
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: utilice split_rows para dividir filas en un DynamicFrame
En este ejemplo de código se utiliza el método split_rows
para dividir las filas en un DynamicFrame
en función del valor del campo id
.
Conjunto de datos de ejemplo
El ejemplo usa un DynamicFrame
llamado l_root_contact_details
que se selecciona de una colección denominada legislators_relationalized
.
l_root_contact_details
tiene el siguiente esquema y entradas.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+
Código de ejemplo
# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows
unbox
unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)
Realiza la conversión unboxing (vuelve a formatear) de un campo de cadena en un elemento DynamicFrame
y devuelve un elemento DynamicFrame
nuevo que contiene el elemento DynamicRecords
de conversión unboxing.
Un DynamicRecord
representa un registro lógico en un DynamicFrame
. Es similar a una fila en un DataFrame
de Apache Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo.
-
path
: ruta completa al nodo de cadena a cuyo nombre se desea aplicar la conversión unboxing. format
: una especificación de formato (opcional). La utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.-
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
options
: una o varias de las siguientes:separator
: cadena que contiene el carácter separador.escaper
: cadena que contiene el carácter de escape.skipFirst
: valor booleano que indica si debe omitirse la primera instancia.-
withSchema
: una cadena que contiene una representación JSON del esquema del nodo. El formato de la representación JSON de un esquema se define mediante el resultado deStructType.json()
. withHeader
: valor booleano que indica si se incluye un encabezado.
Ejemplo: utilice la conversión unboxing para convertir un campo de cadena en una estructura
Este ejemplo de código utiliza el método unbox
para la conversión unboxing o volver a formatear un campo de cadena en un DynamicFrame
en un campo de tipo estructura.
Conjunto de datos de ejemplo
El ejemplo usa un DynamicFrame
llamado mapped_with_string
con el siguiente esquema y entradas.
Observe el campo denominado AddressString
. Este es el campo que el ejemplo convierte en una estructura.
root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...
Código de ejemplo
# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows
unión
union(frame1, frame2, transformation_ctx = "",
info = "", stageThreshold = 0, totalThreshold = 0)
Unir dos DynamicFrames. Devuelve un DynamicFrame que contiene todos los registros de ambos DynamicFrames de entrada. Esta transformación puede devolver resultados diferentes de la unión de dos DataFrames con datos equivalentes. Si necesita el comportamiento de unión del DataFrame de Spark, considere usar toDF
.
-
frame1
: primer DynamicFrame en unirse. -
frame2
: segundo DynamicFrame en unirse. -
transformation_ctx
: (opcional) una cadena única que se utiliza para identificar información de estadísticas/estado -
info
: (opcional) una cadena que está asociada a errores en la transformación -
stageThreshold
: (opcional) un número máximo de errores en la transformación hasta que se produzca un error en el procesamiento -
totalThreshold
: (opcional) un número máximo de errores en total hasta que se produzca un error en el procesamiento.
unnest
unnest(transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Desanida los objetos anidados de un elemento DynamicFrame
, que los convierte en objetos de nivel superior y devuelve un elemento DynamicFrame
desanidado nuevo.
-
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar. -
totalThreshold
: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.
Ejemplo: utilice desanidar para convertir los campos anidados en campos de nivel superior
Este ejemplo de código utiliza el método unnest
para desanidar todos los campos anidados en un DynamicFrame
en campos de nivel superior.
Conjunto de datos de ejemplo
El ejemplo usa un DynamicFrame
llamado mapped_medicare
con el siguiente esquema. Observe que el campo Address
es el único campo que contiene datos anidados.
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
Código de ejemplo
# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
unnest_ddb_json
Desanida las columnas anidadas de un elemento DynamicFrame
que se encuentren específicamente en la estructura JSON de DynamoDB y devuelve un nuevo elemento DynamicFrame
no anidado. Las columnas que sean de una matriz de tipos de estructuras no se desanidarán. Tenga en cuenta que esta transformación de desanidamiento es un tipo específico que se comporta de modo diferente a la transformación unnest
normal y requiere que los datos ya estén en la estructura JSON de DynamoDB. Para obtener más información, consulte JSON de DynamoDB.
unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
-
transformation_ctx
: cadena única que se utiliza para identificar la información del estado (opcional). -
info
: cadena que se asociará a la notificación de errores para esta transformación (opcional). -
stageThreshold
: número de errores detectados durante este proceso de transformación que provocarán que el proceso se termine (opcional: cero de forma predeterminada, lo que indica que el proceso no debería terminarse por error). -
totalThreshold
: número de errores detectados hasta este proceso de transformación incluido que provocarán que el proceso se termine (opcional: cero de forma predeterminada, lo que indica que el proceso no debería terminarse por error).
Por ejemplo, el esquema de una lectura de una exportación con la estructura JSON de DynamoDB puede parecerse al siguiente:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
La transformación unnest_ddb_json()
convertiría esto en:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
En el siguiente ejemplo de código, se muestra cómo utilizar el conector de exportación de DynamoDB de AWS Glue, invocar un desanidamiento JSON de DynamoDB e imprimir el número de particiones:
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()
write
write(connection_type, connection_options, format, format_options, accumulator_size)
Obtiene un DataSink(object) del tipo de conexión especificado del Clase GlueContext de este elemento DynamicFrame
y lo utiliza para escribir y dar formato al contenido de este DynamicFrame
. Devuelve el nuevo DynamicFrame
con formato y escrito tal y como se especifica.
-
connection_type
: tipo de conexión que se utilizará. Entre los valores válidos se incluyen:s3
,mysql
,postgresql
,redshift
,sqlserver
yoracle
. -
connection_options
: opción de conexión que se utilizará (opcional). Para unconnection_type
des3
, se define una ruta de Amazon S3.connection_options = {"path": "
s3://aws-glue-target/temp
"}Para conexiones JDBC, deben definirse varias propiedades. Tenga en cuenta que el nombre de la base de datos debe ser parte de la URL. Opcionalmente, se puede incluir en las opciones de conexión.
aviso
No se recomienda almacenar las contraseñas en el script. Considere utilizar
boto3
para recuperarlas de AWS Secrets Manager o del catálogo de datos de Glue AWS.connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"} format
: una especificación de formato (opcional). Se utiliza para Amazon Simple Storage Service (Amazon S3) o una conexión de AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.format_options
: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.accumulator_size
: el tamaño acumulable que se utilizará, en bytes (opcional).
— errores —
assertErrorThreshold
assertErrorThreshold( )
: confirmación utilizada para los errores de las transformaciones que crearon este elemento DynamicFrame
. Muestra un valor de Exception
desde el DataFrame
subyacente.
errorsAsDynamicFrame
errorsAsDynamicFrame( )
: devuelve un DynamicFrame
con registros de error anidados dentro.
Ejemplo: uso de errorsAsDynamicFrame para ver los registros de errores
En el siguiente ejemplo de código se muestra cómo utilizar el método errorsAsDynamicFrame
para ver un registro de error de un DynamicFrame
.
Conjunto de datos de ejemplo
El ejemplo utiliza el siguiente conjunto de datos que se puede cargar en Amazon S3 como JSON. Tenga en cuenta que el segundo registro tiene un formato incorrecto. Los datos con formato incorrecto generalmente interrumpen el análisis de archivos cuando se utiliza SparkSQL. Sin embargo, DynamicFrame
reconoce los problemas de formato incorrecto y convierte las líneas con formato incorrecto en registros de errores que se pueden gestionar de forma individual.
{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}
Código de ejemplo
# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["
s3://DOC-EXAMPLE-S3-BUCKET/error_data.json
"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')
errorsCount
errorsCount( )
: devuelve el número total de errores de un DynamicFrame
.
stageErrorsCount
stageErrorsCount
: devuelve el número de errores que se produjo en el proceso de generar este DynamicFrame
.