Clase DynamicFrame - AWS Glue

Clase DynamicFrame

Una de las principales abstracciones de Apache Spark es el DataFrame de SparkSQL, que es similar a la construcción DataFrame que se encuentra en R y en Pandas. Un elemento DataFrame es similar a una tabla y admite operaciones de estilo funcional (map/reduce/filter/etc.) y operaciones SQL (select, project, aggregate).

DataFrames son eficaces y de uso general, pero tienen limitaciones en las operaciones de extracción, transformación y carga (ETL). Más aún, es preciso especificar un esquema antes de cargar cualquier dato. SparkSQL aborda esta cuestión al repasar dos veces los datos: la primera para deducir el esquema y la segunda para cargar los datos. Sin embargo, esta deducción es limitada y no se ocupa de la realidad de los datos confusos. Por ejemplo, el mismo campo puede ser de un tipo diferente en diferentes registros. Apache Spark a menudo renuncia e informa del tipo como si fuera una string mediante el texto del campo original. Esto podría no ser correcto y puede que desee controlar mejor cómo se resuelven las discrepancias en el esquema. Y en cuanto a grandes conjuntos de datos, un paso adicional por los datos de origen podría tener un precio prohibitivo.

Para abordar estas limitaciones, AWS Glue introduce DynamicFrame. DynamicFrame es similar a DataFrame, salvo que cada registro se describe a sí mismo, por lo que no es preciso tener un esquema inicial. En su lugar, AWS Glue procesa un esquema sobre la marcha cuando es necesario y codifica explícitamente las incoherencias de esquema mediante un tipo de elección (o unión). Puede resolver estas incoherencias para que los conjuntos de datos pasen a ser compatibles con almacenes de datos que requieren un esquema fijo.

Asimismo, un DynamicRecord representa un registro lógico dentro de un DynamicFrame. Es como una fila en un DataFrame de Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo. Cuando se utiliza Glue AWS con PySpark, no suele manipular de forma independiente DynamicRecords. Más bien, transformará el conjunto de datos en conjunto a través de DynamicFrame.

Puede convertir DynamicFrames en DataFrames y viceversa, una vez que haya resuelto las incoherencias de esquema.

 — construcción —

__init__

__init__(jdf, glue_ctx, name)
  • jdf: referencia a la trama de datos de la máquina virtual de Java (JVM).

  • glue_ctx: un objeto Clase GlueContext.

  • name: cadena de nombre opcional, vacía de forma predeterminada.

fromDF

fromDF(dataframe, glue_ctx, name)

Convierte un DataFrame en un DynamicFrame al convertir campos de DataFrame en campos de DynamicRecord. Devuelve el nuevo DynamicFrame.

Un DynamicRecord representa un registro lógico en un DynamicFrame. Es similar a una fila en un DataFrame de Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo.

Esta función espera que las columnas con nombres duplicados en el DataFrame ya se hayan resuelto.

  • dataframe: DataFrame de Apache Spark SQL que debe convertirse (obligatorio).

  • glue_ctx: objeto Clase GlueContext que especifica el contexto de esta transformación (obligatorio).

  • name: el nombre de DynamicFrame que resulte (opcional desde AWS Glue 3.0).

toDF

toDF(options)

Convierte un DynamicFrame en un DataFrame de Apache Spark convirtiendo campos de DynamicRecords en campos de DataFrame. Devuelve el nuevo DataFrame.

Un DynamicRecord representa un registro lógico en un DynamicFrame. Es similar a una fila en un DataFrame de Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo.

  • options: lista de opciones. Permite que especifique opciones adicionales para el proceso de conversión. Algunas opciones válidas que puede utilizar con el parámetro `options`:

    • format: especifica el formato de los datos, como json, csv, parquet.

    • separater or sep: en el caso de los archivos CSV, especifica el delimitador.

    • header: en el caso de los archivos CSV, indica si la primera fila es un encabezado (verdadero/falso).

    • inferSchema: indica a Spark que deduzca el esquema automáticamente (verdadero/falso).

    A continuación, se muestra un ejemplo del uso del parámetro `options` con el método `toDF`:

    from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) csv_dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://my-bucket/path/to/csv/"]}, format="csv" ) csv_cf = csv_dyf.toDF(options={ "separator": ",", "header": "true", "ïnferSchema": "true" })

    Especifique el tipo de destino si ha elegido el tipo de acción Project y Cast. Algunos ejemplos son los siguientes:

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 — información —

count

count( ): devuelve el número de filas del DataFrame subyacente.

Esquema

schema( ) - Muestra el esquema de este DynamicFrame o en caso de que no esté disponible, el esquema del elemento DataFrame subyacente.

Para obtener más información acerca de los tipos de DynamicFrame que componen este esquema, consulte Tipos de extensión PySpark.

printSchema

printSchema( ): imprime el esquema del DataFrame subyacente.

show

show(num_rows): imprime un número especificado de filas del DataFrame subyacente.

repartition

repartition(numPartitions): devuelve un DynamicFrame nuevo con particiones numPartitions.

coalesce

coalesce(numPartitions): devuelve un DynamicFrame nuevo con particiones numPartitions.

 — transformaciones —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Aplica una asignación declarativa a un DynamicFrame y devuelve un DynamicFrame nuevo con esas asignaciones aplicadas a los campos que se especifican. Los campos no especificados se omiten en el DynamicFrame nuevo.

  • mappings: una lista de tuplas de asignación (obligatorio). Cada una consta de (columna de origen, tipo de origen, columna de destino, tipo de destino).

    Si la columna de origen tiene un punto “.” en el nombre, se deben colocar comillas simples “``” antes y después. Por ejemplo, para asignar this.old.name (cadena) a thisNewName, debe utilizar las siguientes tuplas:

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: uso de apply_mapping para cambiar el nombre de los campos y cambiar los tipos de campos

En el siguiente ejemplo de código, se muestra cómo utilizar el método apply_mapping para cambiar el nombre de los campos seleccionados y cambiar los tipos de campos.

nota

Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Llama a la transformación Clase FlatMap para eliminar campos de un elemento DynamicFrame. Devuelve un DynamicFrame nuevo con los campos especificados anulados.

  • paths: una lista de cadenas. Cada una contiene la ruta completa a un nodo de campo que desea descartar. Puede utilizar la notación de puntos para especificar campos anidados. Por ejemplo, si el campo first es un elemento secundario del campo name en el árbol, especificará "name.first" para la ruta.

    Si el nombre de un nodo de campo tiene un literal., debe escribir el nombre entre tildes graves (`).

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: uso de drop_fields para descartar campos de un DynamicFrame

En este ejemplo de código se utiliza el método drop_fields para eliminar los campos anidados y de nivel superior seleccionados de un DynamicFrame.

Conjunto de datos de ejemplo

El ejemplo utiliza el siguiente conjunto de datos que está representado por la tabla EXAMPLE-FRIENDS-DATA en el código:

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

Código de ejemplo

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

filtro

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Devuelve un DynamicFrame nuevo que contiene todos los DynamicRecords dentro de la entrada DynamicFrame que cumplen la función f de predicado especificada.

  • f: función de predicado que debe aplicarse al elemento DynamicFrame. La función debe tomar un DynamicRecord como argumento y devolver True si DynamicRecord cumple los requisitos de filtro o False si no los cumple (obligatorio).

    Un DynamicRecord representa un registro lógico en un DynamicFrame. Es similar a una fila en un DataFrame de Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: uso de filter para obtener una selección filtrada de campos

En este ejemplo se utiliza el método filter para crear un DynamicFrame nuevo que incluye una selección filtrada de campos de otro DynamicFrame.

Al igual que el método map, filter toma una función como un argumento que se aplica a cada registro en el DynamicFrame original. La función toma un registro como una entrada y devuelve un valor booleano. Si el valor que se devuelve es verdadero, el registro se incluye en el DynamicFrame resultante. Si es falso, el registro se omite.

nota

Para acceder al conjunto de datos utilizado en este ejemplo, consulte Ejemplo de código: Preparación de datos con ResolveChoice, Lambda y ApplyMapping y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

unirse

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Realiza una unión de igualdad con otro elemento DynamicFrame y devuelve el elemento DynamicFrame resultante.

  • paths1: lista de las claves de esta trama que deben unirse.

  • paths2: lista de las claves de la otra trama que deben unirse.

  • frame2: el otro elemento DynamicFrame que debe unirse.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: uso de join para combinar DynamicFrames

En este ejemplo se utiliza el método join para realizar una unión en tres DynamicFrames. AWS Glue realiza la unión en función de las claves de campos que se proporcionan. El DynamicFrame resultante contiene filas de los dos marcos originales donde coinciden las claves especificadas.

Tenga en cuenta que la transformación join mantiene todos los campos intactos. Esto significa que los campos que se especifican para que coincidan aparecen en el DynamicFrame resultante, incluso si son redundantes y contienen las mismas claves. En este ejemplo, utilizamos drop_fields para eliminar estas claves redundantes después de la unión.

nota

Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Devuelve un elemento DynamicFrame nuevo que se obtiene al aplicar la función de mapeo especificada a todos los registros del elemento DynamicFrame original.

  • f: función de asignación que debe aplicarse a todos los registros del elemento DynamicFrame. La función debe tomar un elemento DynamicRecord como argumento y devolver un DynamicRecord nuevo (obligatorio).

    Un DynamicRecord representa un registro lógico en un DynamicFrame. Es similar a una fila en un DataFrame de Apache Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que está asociada con errores en la transformación (opcional).

  • stageThreshold: cantidad máxima de errores que se puede producir en la transformación antes de que se determine que es errónea (opcional). El rol predeterminado es cero.

  • totalThreshold: cantidad máxima de errores que se pueden producir en total antes de que se determine que el proceso es erróneo (opcional). El rol predeterminado es cero.

Ejemplo: uso de map para aplicar una función a cada registro en un DynamicFrame

En este ejemplo se muestra cómo utilizar el método map para aplicar una función a cada registro de un DynamicFrame. En concreto, este ejemplo aplica una función llamada MergeAddress a cada registro para combinar varios campos de dirección en un solo tipo de struct.

nota

Para acceder al conjunto de datos utilizado en este ejemplo, consulte Ejemplo de código: Preparación de datos con ResolveChoice, Lambda y ApplyMapping y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

Combina este objeto DynamicFrame con una instancia provisional de DynamicFrame en función de las claves principales especificadas para identificar registros. Los registros duplicados (registros con las mismas claves principales) no se desduplican. Si no hay ningún registro que coincida en el marco provisional, se retienen todos los registros del origen (incluidos los duplicados). Si el marco provisional tiene registros coincidentes, estos sobrescriben a los registros del origen en AWS Glue.

  • stage_dynamic_frame: DynamicFrame provisional que se fusionará.

  • primary_keys: la lista de campos de clave principal para hacer coincidir los registros de los marcos dinámicos de origen y provisionales.

  • transformation_ctx: cadena única que se utiliza para recuperar metadatos sobre la transformación actual (opcional).

  • options: una cadena de pares de nombre-valor de JSON que proporcionan información adicional para esta transformación. Este argumento no se utiliza actualmente.

  • info: un elemento String. Cualquier cadena que se va a asociar con los errores de esta transformación.

  • stageThreshold: un elemento Long. Número de errores de la transformación especificada que provocarán que el proceso se termine.

  • totalThreshold: un elemento Long. Número total de errores hasta esta transformación (incluida) que provocarán que el proceso se termine.

Este método devuelve un nuevo objeto DynamicFrame que se obtiene combinando esta instancia de DynamicFrame con la instancia de DynamicFrame provisional.

El objeto DynamicFrame devuelto contiene el registro A en estos casos:

  • Si A existe tanto en el marco de origen como en el marco provisional, se devuelve A en el marco provisional.

  • Si A está en la tabla de origen y A.primaryKeys no está en stagingDynamicFrame A no se actualiza en la tabla provisional.

No es necesario que el marco de origen y el marco provisional tengan el mismo esquema.

Ejemplo: utilice mergeDynamicFrame para combinar dos DynamicFrames en función de una clave principal

El siguiente ejemplo de código muestra cómo utilizar el método mergeDynamicFrame para combinar un DynamicFrame con un DynamicFrame de “puesta en escena”, en función del id de la clave principal.

Conjunto de datos de ejemplo

El ejemplo usa dos DynamicFrames de una DynamicFrameCollection llamada split_rows_collection. A continuación se ofrece una lista de las claves de split_rows_collection.

dict_keys(['high', 'low'])

Código de ejemplo

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

relationalize

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Convierte un DynamicFrame en un formulario que cabe en una base de datos relacional. Relacionalizar un DynamicFrame es especialmente útil cuando se desean mover datos de un entorno NoSQL como DynamoDB a una base de datos relacional como MySQL.

La transformación genera una lista de marcos al aplanar columnas anidadas y dinamizar las columnas de la matriz. Las columnas de matriz dinamizada se pueden unir a la tabla raíz con la clave de combinación generada durante la fase de desanidado.

  • root_table_name: nombre de la tabla raíz.

  • staging_path: ruta en la que el método puede almacenar las particiones de las tablas dinamizadas en formato CSV (opcional). Las tablas dinamizadas se leen desde esta ruta.

  • options: diccionario de parámetros opcionales.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: utilice relacionalizar para aplanar un esquema anidado en un DynamicFrame

Este ejemplo de código utiliza el método relationalize para aplanar un esquema anidado en un formulario que encaje en una base de datos relacional.

Conjunto de datos de ejemplo

El ejemplo usa un DynamicFrame llamado legislators_combined con el siguiente esquema. legislators_combined tiene varios campos anidados como links, images y contact_details, que se aplanarán con la transformación relationalize.

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

Código de ejemplo

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

El siguiente resultado permite comparar el esquema del campo anidado llamado contact_details a la tabla que creó la transformación relationalize. Observe que los registros de la tabla se vinculan a la tabla principal mediante una clave externa llamada id y una columna index que representa las posiciones de la matriz.

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Cambia el nombre de un campo en este DynamicFrame y devuelve un DynamicFrame nuevo con el campo con el nombre nuevo.

  • oldName: la ruta completa al nodo cuyo nombre desea cambiar.

    Si el nombre antiguo tiene puntos, RenameField no funcionará a menos que lo ponga entre acentos graves (`). Por ejemplo, para reemplazar this.old.name por thisNewName, debería llamar a rename_field tal y como se indica a continuación:

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName: el nombre nuevo, indicado como una ruta completa.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: utilice rename_field para cambiar el nombre de los campos de un DynamicFrame

Este ejemplo de código utiliza el método rename_field para cambiar el nombre de los campos de un DynamicFrame. Observe que en el ejemplo se utiliza el encadenamiento de métodos para renombrar varios campos al mismo tiempo.

nota

Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.

Código de ejemplo

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

Resuelve un tipo de elección dentro de este elemento DynamicFrame y devuelve el DynamicFrame nuevo.

  • specs: lista de ambigüedades concretas que deben resolverse, cada una en forma de tupla: (field_path, action).

    Hay dos formas de usar resolveChoice. La primera consiste en utilizar el argumento specs para especificar una secuencia de campos específicos y cómo resolverlos. El otro modo de resolveChoice es utilizar el argumento choice para especificar una resolución única para todos los ChoiceTypes.

    Valores de specs se especifican como tuplas compuestas de pares (field_path, action). El valor de field_path identifica un elemento ambiguo concreto, mientras que el valor de action identifica la resolución correspondiente. Los posibles valores son los siguientes:

    • cast:type: intenta convertir todos los valores al tipo especificado. Por ejemplo: cast:int.

    • make_cols: convierte cada tipo diferenciado en una columna con el nombre columnName_type. Resuelve una posible ambigüedad al aplanar los datos. Por ejemplo, si columnA puede ser tanto un int como una string, la resolución puede consistir en producir dos columnas llamadas columnA_int y columnA_string en el elemento DynamicFrame resultante.

    • make_struct: resuelve una posible ambigüedad al utilizar una struct para representar los datos. Por ejemplo, si los datos de una columna pueden ser tanto un int como una string, la acción make_struct generará una columna de estructuras en el elemento DynamicFrame resultante. Cada estructura contiene tanto un int como una string.

    • project:type: resuelve una posible ambigüedad al proyectar todos los datos a uno de los tipos de datos posibles. Por ejemplo, si los datos de una columna pueden ser tanto un int como una string, ejecutar la acción project:string genera una columna en el elemento DynamicFrame resultante, en la que todos los valores int se han convertido en cadenas.

    Si en field_path se identifica una matriz, incluya corchetes vacíos después del nombre de la matriz para evitar ambigüedades. Por ejemplo, suponga que está trabajando con datos estructurados tal y como se indica a continuación:

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    Puede seleccionar la versión numérica en vez de la de cadena del precio si configura field_path en "myList[].price" y action en "cast:double".

    nota

    Solo se puede utilizar uno, el parámetro specs o el parámetro choice. Si el parámetro specs no es None, el parámetro choice tiene que ser una cadena vacía. Y viceversa, si el parámetro choice no es una cadena vacía, el parámetro specs tiene que ser None.

  • choice: especifica una resolución única para todos los ChoiceTypes. Se puede utilizar en los casos en los que se desconozca la lista completa de ChoiceTypes antes del tiempo de ejecución. Además de las acciones enumeradas anteriormente para specs, este argumento también soporta la siguiente acción:

    • match_catalog: intenta convertir cada ChoiceType al tipo correspondiente en la tabla de Data Catalog especificada.

  • database: la base de datos de Data Catalog que se usará con la acción match_catalog.

  • table_name: la tabla de Data Catalog que se usará con la acción match_catalog.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: número de errores detectados hasta este proceso de transformación incluido que provocarán que el proceso se termine (opcional). Es cero de forma predeterminada, lo que indica que el proceso no debería terminarse por error.

  • catalog_id: el ID de catálogo de Data Catalog al que se accede (el ID de cuenta de Data Catalog). Cuando se establece en None (valor predeterminado), utiliza el ID de catálogo de la cuenta que hace la llamada.

Ejemplo: utilice resolveChoice para gestionar una columna que contiene varios tipos

Este ejemplo de código utiliza el método resolveChoice para especificar cómo gestionar una columna DynamicFrame que contiene valores de varios tipos. El ejemplo muestra dos formas comunes de gestionar una columna con diferentes tipos:

  • Convierta la columna en un solo tipo de datos.

  • Conserve todos los tipos en columnas separadas.

Conjunto de datos de ejemplo

nota

Para acceder al conjunto de datos utilizado en este ejemplo, consulte Ejemplo de código: Preparación de datos con ResolveChoice, Lambda y ApplyMapping y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.

El ejemplo usa un DynamicFrame llamado medicare con el siguiente esquema:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Código de ejemplo

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Devuelve un DynamicFrame nuevo que contiene los campos seleccionados.

  • paths: una lista de cadenas. Cada cadena es una ruta a un nodo de nivel superior que desea seleccionar.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: uso de select_fields para crear un DynamicFrame nuevo con los campos elegidos

En el siguiente ejemplo de código, se muestra cómo utilizar el método select_fields para crear un DynamicFrame nuevo con una lista seleccionada de campos de un DynamicFrame existente.

nota

Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

simplify_ddb_json

simplify_ddb_json(): DynamicFrame

Simplifica las columnas anidadas en un DynamicFrame que se encuentran de manera específica en la estructura de JSON de DynamoDB y devuelve un nuevo DynamicFrame simplificado. Si hay varios tipos y tipos de mapas una lista de tipos, no se simplificarán los elementos de la lista. Tenga en cuenta que este es un tipo específico de transformación que se comporta de manera distinta a la transformación unnest regular y que necesita que los datos ya se encuentren en la estructura JSON de DynamoDB. Para obtener más información, consulte JSON de DynamoDB.

Por ejemplo, el esquema de una lectura de una exportación con la estructura JSON de DynamoDB puede parecerse al siguiente:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

La transformación simplify_ddb_json() convertiría esto en:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Por ejemplo: utilice simplify_ddb_json para invocar una simplificación JSON de DynamoDB.

Este ejemplo de código utiliza el método simplify_ddb_json para utilizar el conecto de exportación de DynamoDB de AWS Glue, invocar una simplificación JSON de DynamoDB e imprimir el número de pariciones.

Código de ejemplo

from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())

spigot

spigot(path, options={})

Escribe registros de ejemplo en un destino específico para ayudarlo a verificar las transformaciones realizadas por su trabajo.

  • path: ruta del destino en el que se escribirá (obligatorio).

  • options: pares valor-clave que especifican opciones (opcional). La opción "topk" especifica que deben escribirse los primeros registros k. La opción "prob" especifica la probabilidad de que se elija un registro determinado (como decimal). Puede usarlo para seleccionar registros para escribir.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

Ejemplo: utilice spigot para escribir campos de ejemplo desde un DynamicFrame en Amazon S3

Este ejemplo de código utiliza el método spigot para escribir registros de ejemplo en un bucket de Amazon S3 después de aplicar la transformación select_fields.

Conjunto de datos de ejemplo

nota

Para acceder al conjunto de datos utilizado en este ejemplo, consulte Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos y siga las instrucciones en Paso 1: Rastrear los datos del bucket de Amazon S3.

El ejemplo usa un DynamicFrame llamado persons con el siguiente esquema:

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

Código de ejemplo

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

A continuación, se muestra un ejemplo de los datos que spigot escribe en Amazon S3. Como se especifica el código de ejemplo options={"topk": 10}, los datos de la muestra contienen los 10 primeros registros.

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Devuelve una nueva DynamicFrameCollection que contiene dos DynamicFrames. El primer DynamicFrame contiene todos los nodos que se han dividido y el segundo contiene los nodos que quedan.

  • paths: lista de cadenas, cada una de las cuales es una ruta completa a un nodo que se desea dividir en un elemento DynamicFrame nuevo.

  • name1: cadena de nombre para el DynamicFrame que se divide.

  • name2: cadena de nombre para el elemento DynamicFrame que queda después de que se hayan dividido los nodos especificados.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: utilice split_fields para dividir los campos seleccionados en un campo DynamicFrame independiente

Este ejemplo de código usa el método split_fields para dividir una lista de campos especificados en una lista DynamicFrame independiente.

Conjunto de datos de ejemplo

El ejemplo usa un DynamicFrame llamado l_root_contact_details que proviene de una colección denominada legislators_relationalized.

l_root_contact_details tiene el siguiente esquema y entradas.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

Código de ejemplo

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Divide una o más filas de un DynamicFrame en un nuevo DynamicFrame.

El método devuelve un nuevo DynamicFrameCollection que contiene dos DynamicFrames. El primer DynamicFrame contiene todas las filas que se han dividido y el segundo contiene las filas que quedan.

  • comparison_dict: diccionario donde la clave es una ruta a una columna y el valor es otro diccionario que mapea comparadores con valores con los que se compara el valor de columna. Por ejemplo, {"age": {">": 10, "<": 20}} divide todas las filas cuyo valor en la columna antigüedad sea superior a 10 e inferior a 20.

  • name1: cadena de nombre para el DynamicFrame que se divide.

  • name2: cadena de nombre para el elemento DynamicFrame que queda después de que se hayan dividido los nodos especificados.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: utilice split_rows para dividir filas en un DynamicFrame

En este ejemplo de código se utiliza el método split_rows para dividir las filas en un DynamicFrame en función del valor del campo id.

Conjunto de datos de ejemplo

El ejemplo usa un DynamicFrame llamado l_root_contact_details que se selecciona de una colección denominada legislators_relationalized.

l_root_contact_details tiene el siguiente esquema y entradas.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

Código de ejemplo

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

Realiza la conversión unboxing (vuelve a formatear) de un campo de cadena en un elemento DynamicFrame y devuelve un elemento DynamicFrame nuevo que contiene el elemento DynamicRecords de conversión unboxing.

Un DynamicRecord representa un registro lógico en un DynamicFrame. Es similar a una fila en un DataFrame de Apache Spark, salvo que es autodescriptivo y se puede utilizar para datos que no se ajustan a un esquema fijo.

  • path: ruta completa al nodo de cadena a cuyo nombre se desea aplicar la conversión unboxing.

  • format: una especificación de formato (opcional). La utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • options: una o varias de las siguientes:

    • separator: cadena que contiene el carácter separador.

    • escaper: cadena que contiene el carácter de escape.

    • skipFirst: valor booleano que indica si debe omitirse la primera instancia.

    • withSchema: una cadena que contiene una representación JSON del esquema del nodo. El formato de la representación JSON de un esquema se define mediante el resultado de StructType.json().

    • withHeader: valor booleano que indica si se incluye un encabezado.

Ejemplo: utilice la conversión unboxing para convertir un campo de cadena en una estructura

Este ejemplo de código utiliza el método unbox para la conversión unboxing o volver a formatear un campo de cadena en un DynamicFrame en un campo de tipo estructura.

Conjunto de datos de ejemplo

El ejemplo usa un DynamicFrame llamado mapped_with_string con el siguiente esquema y entradas.

Observe el campo denominado AddressString. Este es el campo que el ejemplo convierte en una estructura.

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

Código de ejemplo

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

unión

union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)

Unir dos DynamicFrames. Devuelve un DynamicFrame que contiene todos los registros de ambos DynamicFrames de entrada. Esta transformación puede devolver resultados diferentes de la unión de dos DataFrames con datos equivalentes. Si necesita el comportamiento de unión del DataFrame de Spark, considere usar toDF.

  • frame1: primer DynamicFrame en unirse.

  • frame2: segundo DynamicFrame en unirse.

  • transformation_ctx: (opcional) una cadena única que se utiliza para identificar información de estadísticas/estado

  • info: (opcional) una cadena que está asociada a errores en la transformación

  • stageThreshold: (opcional) un número máximo de errores en la transformación hasta que se produzca un error en el procesamiento

  • totalThreshold: (opcional) un número máximo de errores en total hasta que se produzca un error en el procesamiento.

unnest

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Desanida los objetos anidados de un elemento DynamicFrame, que los convierte en objetos de nivel superior y devuelve un elemento DynamicFrame desanidado nuevo.

  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: cantidad de errores detectados durante esta transformación que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

  • totalThreshold: cantidad de errores detectados hasta esta transformación incluida que provocarán que el proceso falle (opcional). El valor predeterminado es cero, lo que indica que el proceso no debería fallar.

Ejemplo: utilice desanidar para convertir los campos anidados en campos de nivel superior

Este ejemplo de código utiliza el método unnest para desanidar todos los campos anidados en un DynamicFrame en campos de nivel superior.

Conjunto de datos de ejemplo

El ejemplo usa un DynamicFrame llamado mapped_medicare con el siguiente esquema. Observe que el campo Address es el único campo que contiene datos anidados.

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

Código de ejemplo

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

Desanida las columnas anidadas de un elemento DynamicFrame que se encuentren específicamente en la estructura JSON de DynamoDB y devuelve un nuevo elemento DynamicFrame no anidado. Las columnas que sean de una matriz de tipos de estructuras no se desanidarán. Tenga en cuenta que esta transformación de desanidamiento es un tipo específico que se comporta de modo diferente a la transformación unnest normal y requiere que los datos ya estén en la estructura JSON de DynamoDB. Para obtener más información, consulte JSON de DynamoDB.

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
  • transformation_ctx: cadena única que se utiliza para identificar la información del estado (opcional).

  • info: cadena que se asociará a la notificación de errores para esta transformación (opcional).

  • stageThreshold: número de errores detectados durante este proceso de transformación que provocarán que el proceso se termine (opcional: cero de forma predeterminada, lo que indica que el proceso no debería terminarse por error).

  • totalThreshold: número de errores detectados hasta este proceso de transformación incluido que provocarán que el proceso se termine (opcional: cero de forma predeterminada, lo que indica que el proceso no debería terminarse por error).

Por ejemplo, el esquema de una lectura de una exportación con la estructura JSON de DynamoDB puede parecerse al siguiente:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

La transformación unnest_ddb_json() convertiría esto en:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

En el siguiente ejemplo de código, se muestra cómo utilizar el conector de exportación de DynamoDB de AWS Glue, invocar un desanidamiento JSON de DynamoDB e imprimir el número de particiones:

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

write

write(connection_type, connection_options, format, format_options, accumulator_size)

Obtiene un DataSink(object) del tipo de conexión especificado del Clase GlueContext de este elemento DynamicFrame y lo utiliza para escribir y dar formato al contenido de este DynamicFrame. Devuelve el nuevo DynamicFrame con formato y escrito tal y como se especifica.

  • connection_type: tipo de conexión que se utilizará. Entre los valores válidos se incluyen: s3, mysql, postgresql, redshift, sqlserver y oracle.

  • connection_options: opción de conexión que se utilizará (opcional). Para un connection_type de s3, se define una ruta de Amazon S3.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    Para conexiones JDBC, deben definirse varias propiedades. Tenga en cuenta que el nombre de la base de datos debe ser parte de la URL. Opcionalmente, se puede incluir en las opciones de conexión.

    aviso

    No se recomienda almacenar las contraseñas en el script. Considere utilizar boto3 para recuperarlas de AWS Secrets Manager o del catálogo de datos de Glue AWS.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format: una especificación de formato (opcional). Se utiliza para Amazon Simple Storage Service (Amazon S3) o una conexión de AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • format_options: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • accumulator_size: el tamaño acumulable que se utilizará, en bytes (opcional).

 — errores —

assertErrorThreshold

assertErrorThreshold( ): confirmación utilizada para los errores de las transformaciones que crearon este elemento DynamicFrame. Muestra un valor de Exception desde el DataFrame subyacente.

errorsAsDynamicFrame

errorsAsDynamicFrame( ): devuelve un DynamicFrame con registros de error anidados dentro.

Ejemplo: uso de errorsAsDynamicFrame para ver los registros de errores

En el siguiente ejemplo de código se muestra cómo utilizar el método errorsAsDynamicFrame para ver un registro de error de un DynamicFrame.

Conjunto de datos de ejemplo

El ejemplo utiliza el siguiente conjunto de datos que se puede cargar en Amazon S3 como JSON. Tenga en cuenta que el segundo registro tiene un formato incorrecto. Los datos con formato incorrecto generalmente interrumpen el análisis de archivos cuando se utiliza SparkSQL. Sin embargo, DynamicFrame reconoce los problemas de formato incorrecto y convierte las líneas con formato incorrecto en registros de errores que se pueden gestionar de forma individual.

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

Código de ejemplo

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ): devuelve el número total de errores de un DynamicFrame.

stageErrorsCount

stageErrorsCount: devuelve el número de errores que se produjo en el proceso de generar este DynamicFrame.