GlueContext clase - AWS Glue

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

GlueContext clase

Envuelve el SparkContextobjeto Apache Spark y, por lo tanto, proporciona mecanismos para interactuar con la plataforma Apache Spark.

__init__

__init__(sparkContext)
  • sparkContext: el contexto de Apache Spark que se va a utilizar.

Creación

getSource

getSource(connection_type, transformation_ctx = "", **options)

Crea un objeto DataSource que se puede utilizar para leer DynamicFrames desde fuentes externas.

  • connection_type: el tipo de conexión que se va a utilizar, como Amazon Simple Storage Service (Amazon S3), Amazon Redshift y JDBC. Los valores válidos son s3, mysql, postgresql, redshift, sqlserver, oracle y dynamodb.

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • options: conjunto de pares nombre-valor opcionales. Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.

A continuación, se muestra un ejemplo de cómo se utiliza getSource.

>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()

create_dynamic_frame_from_rdd

create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")

Muestra un DynamicFrame que se crea a partir de un conjunto de datos distribuido resistente (RDD) de Apache Spark.

  • data: el origen de datos que se va a utilizar.

  • name: el nombre de los datos que se van a utilizar.

  • schema: el esquema que se va a utilizar (opcional).

  • sample_ratio: el ratio de muestra que se va a utilizar (opcional).

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

create_dynamic_frame_from_catalog

create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)

Muestra un DynamicFrame que se crea mediante una base de datos del Catálogo de datos y un nombre de tabla. Al usar este método, se proporcionan format_options las propiedades de la tabla AWS Glue Data Catalog especificada y otras opciones a través del additional_options argumento.

  • Database: la base de datos de lectura.

  • table_name: nombre de la tabla de lectura.

  • redshift_tmp_dir: directorio provisional de Amazon Redshift que se va a utilizar (opcional).

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • push_down_predicate: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para conocer las fuentes compatibles y las limitaciones, consulta Optimizar las lecturas con pulsaciones en AWS Glue ETL. Para obtener más información, consulte Filtrado previo con predicados de inserción.

  • additional_options: conjunto de pares nombre-valor opcionales. Las opciones posibles incluyen las enumeradas en Tipos de conexión y opciones para ETL en AWS Glue para Spark, excepto por endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification y delimiter. Otra opción soportada es catalogPartitionPredicate:

    catalogPartitionPredicate: puede transferir una expresión de catálogo para filtrar en función de las columnas de índice. Esto inserta el filtrado hacia el lado del servidor. Para obtener más información, consulte Índices de partición de AWS Glue. Tenga en cuenta que push_down_predicate y catalogPartitionPredicate utilizan sintaxis diferentes. El primero utiliza la sintaxis estándar de Spark SQL y el segundo utiliza el analizador JSQL.

  • catalog_id: ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Cuando el valor es Ninguno, se utiliza el ID de cuenta predeterminado del intermediario.

create_dynamic_frame_from_options

create_dynamic_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

Muestra un DynamicFrame que se crea con la conexión y el formato especificados.

  • connection_type: el tipo de conexión, como Amazon S3, Amazon Redshift y JDBC. Los valores válidos son s3, mysql, postgresql, redshift, sqlserver, oracle y dynamodb.

  • connection_options: las opciones de conexión, como las rutas y la tabla de bases de datos (opcional). Para un connection_type de s3, se define una lista de rutas de Amazon S3.

    connection_options = {"paths": ["s3://aws-glue-target/temp"]}

    Para conexiones JDBC, deben definirse varias propiedades. Tenga en cuenta que el nombre de la base de datos debe ser parte de la URL. Opcionalmente, se puede incluir en las opciones de conexión.

    aviso

    No se recomienda almacenar las contraseñas en el script. Considere utilizarlos boto3 para recuperarlos del AWS Secrets Manager catálogo de datos de AWS Glue.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    La propiedad dbtable es el nombre de la tabla de JDBC. Para almacenes de datos de JDBC que admiten esquemas dentro de una base de datos, especifique schema.table-name. Si no se ha proporcionado un esquema, se usa el esquema "public" predeterminado.

    Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.

  • format— Una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • format_options: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • push_down_predicate: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para conocer las fuentes compatibles y las limitaciones, consulta Optimizar las lecturas con pulsaciones en AWS Glue ETL. Para obtener más información, consulte Filtrado previo con predicados de inserción.

create_sample_dynamic_frame_from_catalog

create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)

Muestra un DynamicFrame de ejemplo que se crea mediante una base de datos del Catálogo de datos y un nombre de tabla. La DynamicFrame solo contiene los primeros registros de num de un origen de datos.

  • database: la base de datos de lectura.

  • table_name: nombre de la tabla de lectura.

  • num: número máximo de registros del marco dinámico de muestra arrojado.

  • redshift_tmp_dir: directorio provisional de Amazon Redshift que se va a utilizar (opcional).

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • push_down_predicate: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para obtener más información, consulte Filtrado previo con predicados de inserción.

  • additional_options: conjunto de pares nombre-valor opcionales. Las opciones posibles incluyen las enumeradas en Tipos de conexión y opciones para ETL en AWS Glue para Spark, excepto por endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classification y delimiter.

  • sample_options: parámetros para controlar el comportamiento del muestreo (opcional). Parámetros disponibles actuales para los orígenes de Simple Storage Service (Amazon S3):

    • maxSamplePartitions: número máximo de particiones que leerá el muestreo. El valor predeterminado es 10

    • maxSampleFilesPerPartition: número máximo de archivos que leerá el muestreo en una partición. El valor predeterminado es 10.

      Estos parámetros ayudan a reducir el tiempo que consume el listado de archivos. Por ejemplo, supongamos que el conjunto de datos tiene 1000 particiones y cada partición tiene 10 archivos. Si se configura maxSamplePartitions = 10 y maxSampleFilesPerPartition = 10, en lugar de enumerar los 10 000 archivos, el muestreo solo mostrará y leerá las 10 primeras particiones con los 10 primeros archivos de cada uno: 10 * 10 = 100 archivos en total.

  • catalog_id: el ID de catálogo del Catálogo de datos al que se accede (el ID de cuenta del Catálogo de datos). De forma predeterminada, se establece en None. None es el valor predeterminado para el ID de catálogo de la cuenta del servicio que hace la llamada.

create_sample_dynamic_frame_from_options

create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")

Arroja un DynamicFrame de ejemplo que se crea con la conexión y el formato especificados. La DynamicFrame solo contiene los primeros registros de num de un origen de datos.

  • connection_type: el tipo de conexión, como Amazon S3, Amazon Redshift y JDBC. Los valores válidos son s3, mysql, postgresql, redshift, sqlserver, oracle y dynamodb.

  • connection_options: las opciones de conexión, como las rutas y la tabla de bases de datos (opcional). Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.

  • num: número máximo de registros del marco dinámico de muestra arrojado.

  • sample_options: parámetros para controlar el comportamiento del muestreo (opcional). Parámetros disponibles actuales para los orígenes de Simple Storage Service (Amazon S3):

    • maxSamplePartitions: número máximo de particiones que leerá el muestreo. El valor predeterminado es 10

    • maxSampleFilesPerPartition: número máximo de archivos que leerá el muestreo en una partición. El valor predeterminado es 10.

      Estos parámetros ayudan a reducir el tiempo que consume el listado de archivos. Por ejemplo, supongamos que el conjunto de datos tiene 1000 particiones y cada partición tiene 10 archivos. Si se configura maxSamplePartitions = 10 y maxSampleFilesPerPartition = 10, en lugar de enumerar los 10 000 archivos, el muestreo solo mostrará y leerá las 10 primeras particiones con los 10 primeros archivos de cada uno: 10 * 10 = 100 archivos en total.

  • format— Una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • format_options: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • push_down_predicate: filtra particiones sin tener que enumerar y leer todos los archivos del conjunto de datos. Para obtener más información, consulte Filtrado previo con predicados de inserción.

add_ingestion_time_columns

add_ingestion_time_columns(dataFrame, timeGranularity = "")

Agrega columnas de tiempo de ingesta como ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute al DataFrame de entrada. Esta función se genera en forma automática en el script generado por AWS Glue cuando especifique una tabla del Catálogo de datos con Amazon S3 como destino. Esta función actualiza en forma automática la partición con columnas de tiempo de ingesta en la tabla de salida. Esto permite que los datos de salida se dividan automáticamente en el tiempo de ingesta sin requerir columnas de tiempo de ingesta explícitas en los datos de entrada.

  • dataFrame: el dataFrame al que anexar las columnas de tiempo de ingesta.

  • timeGranularity: la granularidad de las columnas de tiempo. Los valores válidos son “day”, “hour” y “minute”. Por ejemplo, si “hour” se transfiere a la función, el dataFrame original tendrá las columnas de tiempo “ingest_year,” “ingest_month,” “ingest_day” y “ingest_hour” anexadas.

Devuelve el marco de datos después de anexar las columnas de granularidad de tiempo.

Ejemplo:

dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))

create_data_frame_from_catalog

create_data_frame_from_catalog(database, table_name, transformation_ctx = "", additional_options = {})

Devuelve un DataFrame que se crea con información de una tabla del Catálogo de datos.

  • database: la base de datos del Catálogo de datos de la que se va a leer.

  • table_name: el nombre de la tabla del Catálogo de datos de la que se va a leer.

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • additional_options: conjunto de pares nombre-valor opcionales. Las opciones posibles incluyen las enumeradas en Tipos de conexión y opciones para ETL en AWS Glue para Spark para orígenes de streaming, como startingPosition, maxFetchTimeInMs y startingOffsets.

    • useSparkDataSource— Si se establece en true, AWS Glue debe usar la API nativa de Spark Data Source para leer la tabla. La API de orígenes de datos de Spark admite los siguientes formatos: AVRO, binario, CSV, JSON, ORC, Parquet y texto. En una tabla del Catálogo de datos, especifique el formato mediante la propiedad classification. Para obtener más información sobre la API de orígenes de datos de Spark, consulte la documentación oficial de Apache Spark.

      El uso de create_data_frame_from_catalog con useSparkDataSource presenta las siguientes ventajas:

      • Devuelve directamente un DataFrame y ofrece una alternativa a create_dynamic_frame.from_catalog().toDF().

      • Admite el control AWS Lake Formation de permisos a nivel de tabla para los formatos nativos.

      • Admite la lectura de formatos de lagos de datos sin control de permisos a AWS Lake Formation nivel de tabla. Para obtener más información, consulte Uso de marcos de lagos de datos con trabajos de ETL de AWS Glue.

      Cuando lo habilitasuseSparkDataSource, también puedes añadir cualquiera de las opciones de fuente de datos de Spark additional_options según sea necesario. AWS Glue pasa estas opciones directamente al lector Spark.

    • useCatalogSchema— Cuando se establece en true, AWS Glue aplica el esquema del catálogo de datos al resultadoDataFrame. De lo contrario, el lector deduce el esquema a partir de los datos. Cuando se habilita useCatalogSchema, también se debe establecer useSparkDataSource en true (verdadero).

Limitaciones

Tenga en cuenta las siguientes limitaciones cuando utilice la opción useSparkDataSource:

  • Cuando lo usasuseSparkDataSource, AWS Glue crea una nueva DataFrame sesión de Spark distinta de la sesión original de Spark.

  • El filtrado de DataFrame particiones de Spark no funciona con las siguientes funciones de AWS Glue.

    Para utilizar el filtrado de particiones con estas funciones, puedes usar el predicado desplegable AWS Glue. Para obtener más información, consulte Filtrado previo con predicados de inserción. El filtrado de columnas no particionadas no se ve afectado.

    En el siguiente script de ejemplo, se muestra la forma incorrecta de filtrar particiones con la opción excludeStorageClasses.

    // Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")

    En el siguiente script de ejemplo, se muestra la forma correcta de utilizar un predicado de inserción para filtrar particiones con la opción excludeStorageClasses.

    // Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")

Ejemplo: crear una tabla CSV con el lector de orígenes de datos de Spark

// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=<database_name>, table_name=<table_name>, additional_options = {"useSparkDataSource": True, "sep": '\t'} )

create_data_frame_from_options

create_data_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

Esta API ha quedado obsoleta. En su lugar, use la API getSource(). Muestra un DataFrame que se crea con la conexión y el formato especificados. Utilice esta función solo con orígenes de streaming de AWS Glue.

  • connection_type: el tipo de conexión de streaming. Los valores válidos son kinesis y kafka.

  • connection_options: opciones de conexión, que son diferentes para Kinesis y Kafka. Puede encontrar la lista de todas las opciones de conexión para cada origen de datos de streaming en Tipos de conexión y opciones para ETL en AWS Glue para Spark. Tenga en cuenta las siguientes diferencias en las opciones de conexión de streaming:

    • Los orígenes de streaming de Kinesis requieren streamARN, startingPosition, inferSchema y classification.

    • Los orígenes de streaming de Kafka requieren connectionName, topicName, startingOffsets, inferSchema y classification.

  • format— Una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Para obtener información acerca de los formatos soportados, consulte Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark.

  • format_options: opciones del formato especificado. Para obtener información acerca de las opciones de formatos soportados, consulte Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark.

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

Ejemplo de origen de streaming de Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Ejemplo de origen de streaming de Kafka:

kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

forEachBatch

forEachBatch(frame, batch_function, options)

Se aplica la batch_function transferida a cada microlote que se lee desde el origen de streaming.

  • frame— El DataFrame que contiene el microlote actual.

  • batch_function: una función que se aplicará para cada microlote.

  • options: una recopilación de pares clave-valor que contiene información sobre cómo procesar microlotes. Se requieren las siguientes opciones:

    • windowSize: cantidad de tiempo que se debe dedicar al procesamiento de cada lote.

    • checkpointLocation: la ubicación donde se almacenan los puntos de verificación para el trabajo de ETL de streaming.

    • batchMaxRetries: número máximo de reintentos permitidos para este lote si se genera un error. El valor predeterminado es 3. Esta opción sólo se puede configurar para Glue versión 2.0 y superior.

Ejemplo:

glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )

Trabajar con conjuntos de datos en Amazon S3

purge_table

purge_table(catalog_id=None, database="", table_name="", options={}, transformation_ctx="")

Elimina archivos de Amazon S3 correspondientes a la base de datos y la tabla del catálogo especificado. Si se eliminan todos los archivos de una partición, esa partición también se eliminará del catálogo.

Si desea poder recuperar los objetos eliminados, puede habilitar control de versiones de objetos en el bucket de Amazon S3. Cuando se elimina un objeto de un bucket que no tiene habilitado el control de versiones de objetos, el objeto no se puede recuperar. Para obtener más información acerca de cómo recuperar objetos eliminados en un bucket habilitado para versiones, consulte ¿Cómo puedo recuperar un objeto de Amazon S3 eliminado? en el Centro de conocimientos de AWS Support .

  • catalog_id: el ID de catálogo del Catálogo de datos al que se accede (el ID de cuenta del Catálogo de datos). De forma predeterminada, se establece en None. None es el valor predeterminado para el ID de catálogo de la cuenta del servicio que hace la llamada.

  • database: la base de datos que se va a utilizar.

  • table_name: nombre de la tabla que se utilizará.

  • options: opciones para filtrar los archivos que se van a eliminar y para la generación de archivos de manifiesto.

    • retentionPeriod: especifica un período para retener los archivos en cantidad de horas. Se mantienen los archivos que son posteriores al período de retención. De forma predeterminada, se establece en 168 horas (7 días).

    • partitionPredicate: se eliminan las particiones que cumplen con este predicado. Los archivos comprendidos en el período de retención de estas particiones no se eliminan. Configurar en "", valor vacío de forma predeterminada.

    • excludeStorageClasses: no se eliminan los archivos con clase de almacenamiento configurada en excludeStorageClasses. El valor predeterminado es Set(), un conjunto vacío.

    • manifestFilePath: una ruta opcional para la generación de archivos de manifiesto. Todos los archivos que se depuraron correctamente se registran en Success.csv, mientras que los que no lo hicieron se registran en Failed.csv

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional). Se utiliza en la ruta del archivo de manifiesto.

glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

purge_s3_path

purge_s3_path(s3_path, options={}, transformation_ctx="")

Elimina archivos de la ruta de Amazon S3 especificada recursivamente.

Si desea poder recuperar los objetos eliminados, puede habilitar control de versiones de objetos en el bucket de Amazon S3. Cuando se elimina un objeto de un bucket que no tiene habilitado el control de versiones de objetos, el objeto no se puede recuperar. Para obtener más información sobre cómo recuperar objetos eliminados en un bucket con control de versiones, consulte ¿Cómo puedo recuperar un objeto de Amazon S3 que se eliminó? en el Centro de AWS Support conocimiento.

  • s3_path: la ruta de acceso en Amazon S3 de los archivos que se van a eliminar en el formato s3://<bucket>/<prefix>/

  • options: opciones para filtrar los archivos que se van a eliminar y para la generación de archivos de manifiesto.

    • retentionPeriod: especifica un período para retener los archivos en cantidad de horas. Se mantienen los archivos que son posteriores al período de retención. De forma predeterminada, se establece en 168 horas (7 días).

    • excludeStorageClasses: no se eliminan los archivos con clase de almacenamiento configurada en excludeStorageClasses. El valor predeterminado es Set(), un conjunto vacío.

    • manifestFilePath: una ruta opcional para la generación de archivos de manifiesto. Todos los archivos que se depuraron correctamente se registran en Success.csv, mientras que los que no lo hicieron se registran en Failed.csv

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional). Se utiliza en la ruta del archivo de manifiesto.

glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

transition_table

transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)

Inicia la transición de la clase de almacenamiento de los archivos almacenados en Amazon S3 correspondientes a la base de datos y la tabla del catálogo especificado.

Puede realizar la transición entre dos clases de almacenamiento cualquiera. En el caso de las clases de almacenamiento GLACIER y DEEP_ARCHIVE, la transición puede hacerse a estas clases. Sin embargo, debería utilizar S3 RESTORE para realizar la transición de GLACIER y las clases de almacenamiento DEEP_ARCHIVE.

Si ejecuta trabajos de ETL de AWS Glue que leen archivos o particiones de Amazon S3, puede excluir algunos tipos de clases de almacenamiento de Amazon S3. Para obtener más información, consulte Exclusión de clases de almacenamiento de Amazon S3.

  • database: la base de datos que se va a utilizar.

  • table_name: nombre de la tabla que se utilizará.

  • transition_to: la clase de almacenamiento de Amazon S3 hacia la que se hará la transición.

  • options: opciones para filtrar los archivos que se van a eliminar y para la generación de archivos de manifiesto.

    • retentionPeriod: especifica un período para retener los archivos en cantidad de horas. Se mantienen los archivos que son posteriores al período de retención. De forma predeterminada, se establece en 168 horas (7 días).

    • partitionPredicate: se realiza la transición de las particiones que cumplen con este predicado. Los archivos comprendidos en el período de retención de estas particiones no realizan la transición. Configurar en "", valor vacío de forma predeterminada.

    • excludeStorageClasses: no se realiza la transición de los archivos con clase de almacenamiento en excludeStorageClasses. El valor predeterminado es Set(), un conjunto vacío.

    • manifestFilePath: una ruta opcional para la generación de archivos de manifiesto. Todos los archivos que realizaron la transición correctamente se registran en Success.csv, mientras que los que no lo hicieron se registran en Failed.csv

    • accountId: el ID de cuenta de Amazon Web Services para ejecutar la transformación de transición. Es obligatorio para esta transformación.

    • roleArn— La AWS función de ejecutar la transformación de la transición. Es obligatorio para esta transformación.

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional). Se utiliza en la ruta del archivo de manifiesto.

  • catalog_id: el ID de catálogo del Catálogo de datos al que se accede (el ID de cuenta del Catálogo de datos). De forma predeterminada, se establece en None. None es el valor predeterminado para el ID de catálogo de la cuenta del servicio que hace la llamada.

glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

transition_s3_path

transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")

Realiza transiciones recursivas en la clase de almacenamiento de los archivos de la ruta de Amazon S3 especificada.

Puede realizar la transición entre dos clases de almacenamiento cualquiera. En el caso de las clases de almacenamiento GLACIER y DEEP_ARCHIVE, la transición puede hacerse a estas clases. Sin embargo, debería utilizar S3 RESTORE para realizar la transición de GLACIER y las clases de almacenamiento DEEP_ARCHIVE.

Si ejecuta trabajos de ETL de AWS Glue que leen archivos o particiones de Amazon S3, puede excluir algunos tipos de clases de almacenamiento de Amazon S3. Para obtener más información, consulte Exclusión de clases de almacenamiento de Amazon S3.

  • s3_path: la ruta en Amazon S3 de los archivos sobre los que se realizará la transición en el formato s3://<bucket>/<prefix>/

  • transition_to: la clase de almacenamiento de Amazon S3 hacia la que se hará la transición.

  • options: opciones para filtrar los archivos que se van a eliminar y para la generación de archivos de manifiesto.

    • retentionPeriod: especifica un período para retener los archivos en cantidad de horas. Se mantienen los archivos que son posteriores al período de retención. De forma predeterminada, se establece en 168 horas (7 días).

    • partitionPredicate: se realiza la transición de las particiones que cumplen con este predicado. Los archivos comprendidos en el período de retención de estas particiones no realizan la transición. Configurar en "", valor vacío de forma predeterminada.

    • excludeStorageClasses: no se realiza la transición de los archivos con clase de almacenamiento en excludeStorageClasses. El valor predeterminado es Set(), un conjunto vacío.

    • manifestFilePath: una ruta opcional para la generación de archivos de manifiesto. Todos los archivos que realizaron la transición correctamente se registran en Success.csv, mientras que los que no lo hicieron se registran en Failed.csv

    • accountId: el ID de cuenta de Amazon Web Services para ejecutar la transformación de transición. Es obligatorio para esta transformación.

    • roleArn— La AWS función de ejecutar la transformación de transición. Es obligatorio para esta transformación.

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional). Se utiliza en la ruta del archivo de manifiesto.

glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

Extracción

extract_jdbc_conf

extract_jdbc_conf(connection_name, catalog_id = None)

Devuelve dict con claves con las propiedades de configuración del objeto de conexión de AWS Glue en el Catálogo de datos.

  • user: nombre de usuario de la base de datos.

  • password: contraseña de la base de datos.

  • vendor: especifica un proveedor (mysql, postgresql, oracle, sqlserver, etc.).

  • enforceSSL: una cadena booleana que indica si se requiere una conexión segura.

  • customJDBCCert: uso de un certificado de cliente específico de la ruta de Amazon S3 indicada.

  • skipCustomJDBCCertValidation: una cadena booleana que indica si customJDBCCert debe ser validado por una CA.

  • customJDBCCertString: información adicional sobre el certificado personalizado, específico para el tipo de controlador.

  • url: URL de JDBC (obsoleta) con solo protocolo, servidor y puerto.

  • fullUrl: URL de JDBC introducida cuando se creó la conexión (Disponible en AWS Glue versión 3.0 o posterior).

Ejemplo de recuperación de configuraciones de JDBC:

jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}

Transacciones

start_transaction

start_transaction(read_only)

Inicie una nueva transacción. Llama de forma interna a la API startTransaction de Lake Formation.

  • read_only: (booleano) indica si esta transacción debe ser de solo lectura o de lectura y escritura. Se rechazarán las escrituras realizadas con un ID de transacción de solo lectura. No es necesario confirmar las transacciones de solo lectura.

Devuelve el ID de la transacción.

commit_transaction

commit_transaction(transaction_id, wait_for_commit = True)

Intenta confirmar la transacción especificada. Es posible que se devuelva commit_transaction antes de que la transacción haya terminado de confirmarse. Llama de forma interna a la API commitTransaction de Lake Formation.

  • transaction_id : (cadena) la transacción que se confirmará.

  • wait_for_commit: (booleano) determina si se devuelve commit_transaction de inmediato. El valor predeterminado es true. Si es falso, commit_transaction realiza un sondeo y espera hasta que la transacción se haya confirmado. La cantidad de tiempo de espera se limita a un minuto mediante retroceso exponencial con un máximo de seis reintentos.

Devuelve un valor booleano para indicar si se realizó o no la confirmación.

cancel_transaction

cancel_transaction(transaction_id)

Intenta cancelar la transacción especificada. Devuelve una excepción de TransactionCommittedException si la transacción se había confirmado anteriormente. Internamente se llama CancelTransactionAPI Lake Formation.

  • transaction_id: (cadena) la transacción que se cancelará.

Escritura

getSink

getSink(connection_type, format = None, transformation_ctx = "", **options)

Obtiene un objeto DataSink que se puede utilizar para escribir DynamicFrames en fuentes externas. Compruebe el valor de format de SparkSQL en primer lugar para asegurarse de que obtiene el receptor esperado.

  • connection_type: tipo de conexión que se va a utilizar, como Amazon S3, Amazon Redshift y JDBC. Los valores válidos son s3, mysql, postgresql, redshift, sqlserver, oracle, kinesis y kafka.

  • format: el formato de SparkSQL que se utilizará (opcional).

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • options: conjunto de pares de nombre-valor que se utilizan para especificar las opciones de conexión. Algunos de valores posibles son:

    • user y password: para autorización

    • url: punto de conexión del almacén de datos

    • dbtable: nombre de la tabla de destino

    • bulkSize: grado de paralelismo para operaciones de inserción

Las opciones que puede especificar dependen del tipo de conexión. Consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark para obtener más valores y ejemplos.

Ejemplo:

>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)

write_dynamic_frame_from_options

write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

Escribe y devuelve un DynamicFrame mediante la conexión y el formato especificados.

  • frame: el DynamicFrame que se va a escribir.

  • connection_type: el tipo de conexión, como Amazon S3, Amazon Redshift y JDBC. Los valores válidos son s3, mysql, postgresql, redshift, sqlserver, oracle, kinesis y kafka.

  • connection_options: opciones de conexión, como la tabla de rutas y bases de datos (opcional). Para un connection_type de s3, se define una ruta de Amazon S3.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    Para conexiones JDBC, deben definirse varias propiedades. Tenga en cuenta que el nombre de la base de datos debe ser parte de la URL. Opcionalmente, se puede incluir en las opciones de conexión.

    aviso

    No se recomienda almacenar las contraseñas en el script. Considere utilizarlos boto3 para recuperarlos del AWS Secrets Manager catálogo de datos de AWS Glue.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    La propiedad dbtable es el nombre de la tabla de JDBC. Para almacenes de datos de JDBC que admiten esquemas dentro de una base de datos, especifique schema.table-name. Si no se ha proporcionado un esquema, se usa el esquema "public" predeterminado.

    Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.

  • format— Una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • format_options: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

write_from_options

write_from_options(frame_or_dfc, connection_type, connection_options={}, format={}, format_options={}, transformation_ctx = "")

Escribe y devuelve un DynamicFrame o DynamicFrameCollection que se crea con la información sobre la conexión y el formato especificada.

  • frame_or_dfc: el DynamicFrame o la DynamicFrameCollection que se van a escribir.

  • connection_type: el tipo de conexión, como Amazon S3, Amazon Redshift y JDBC. Entre los valores válidos se incluyen: s3, mysql, postgresql, redshift, sqlserver y oracle.

  • connection_options: opciones de conexión, como la tabla de rutas y bases de datos (opcional). Para un connection_type de s3, se define una ruta de Amazon S3.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    Para conexiones JDBC, deben definirse varias propiedades. Tenga en cuenta que el nombre de la base de datos debe ser parte de la URL. Opcionalmente, se puede incluir en las opciones de conexión.

    aviso

    No se recomienda almacenar las contraseñas en el script. Considere utilizarlos boto3 para recuperarlos del AWS Secrets Manager catálogo de datos de AWS Glue.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    La propiedad dbtable es el nombre de la tabla de JDBC. Para almacenes de datos de JDBC que admiten esquemas dentro de una base de datos, especifique schema.table-name. Si no se ha proporcionado un esquema, se usa el esquema "public" predeterminado.

    Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.

  • format— Una especificación de formato. Se utiliza con una conexión de Amazon S3 o AWS Glue que admite diversos formatos. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • format_options: opciones del formato especificado. Consulte en Opciones de formato de datos para las entradas y las salidas en AWS Glue para Spark los formatos que se admiten.

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

write_dynamic_frame_from_catalog

write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

Escribe y devuelve un DynamicFrame que se crea mediante una base de datos y una tabla del Catálogo de datos.

  • frame: el DynamicFrame que se va a escribir.

  • Database: la base de datos del Catálogo de datos que contiene la tabla.

  • table_name: el nombre de la tabla del Catálogo de datos asociada al destino.

  • redshift_tmp_dir: directorio provisional de Amazon Redshift que se va a utilizar (opcional).

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • additional_options: conjunto de pares nombre-valor opcionales.

  • catalog_id: ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Cuando el valor es Ninguno, se utiliza el ID de cuenta predeterminado del intermediario.

write_data_frame_from_catalog

write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

Escribe y devuelve un DataFrame que se crea mediante una base de datos y una tabla del Catálogo de datos. Este método admite la escritura en formatos de lagos de datos (Hudi, Iceberg y Delta Lake). Para obtener más información, consulte Uso de marcos de lagos de datos con trabajos de ETL de AWS Glue.

  • frame: el DataFrame que se va a escribir.

  • Database: la base de datos del Catálogo de datos que contiene la tabla.

  • table_name: nombre de la tabla del Catálogo de datos que está asociada al destino.

  • redshift_tmp_dir: directorio provisional de Amazon Redshift que se va a utilizar (opcional).

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • additional_options: conjunto de pares nombre-valor opcionales.

    • useSparkDataSink— Cuando se establece en true, AWS Glue debe usar la API nativa de Spark Data Sink para escribir en la tabla. Al activar esta opción, puedes añadir cualquier opción de fuente de datos de Spark additional_options según sea necesario. AWS Glue pasa estas opciones directamente a la grabadora Spark.

  • catalog_id: el ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Si no especifica un valor, se utiliza el ID de cuenta predeterminado del intermediario.

Limitaciones

Tenga en cuenta las siguientes limitaciones cuando utilice la opción useSparkDataSink:

Ejemplo: escribir una tabla de Hudi con el escritor de orígenes de datos de Spark

hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name': <table_name>, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': <table_name>, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': <database_name>, 'hoodie.datasource.hive_sync.table': <table_name>, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame = <df_product_inserts>, database = <database_name>, table_name = <table_name>, additional_options = hudi_options )

write_dynamic_frame_from_jdbc_conf

write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

Escribe y devuelve un DynamicFrame mediante la información de la conexión JDBC especificada.

  • frame: el DynamicFrame que se va a escribir.

  • catalog_connection: conexión al catálogo que se va a utilizar.

  • connection_options: opciones de conexión, como la tabla de rutas y bases de datos (opcional). Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.

  • redshift_tmp_dir: directorio provisional de Amazon Redshift que se va a utilizar (opcional).

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • catalog_id: ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Cuando el valor es Ninguno, se utiliza el ID de cuenta predeterminado del intermediario.

write_from_jdbc_conf

write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

Escribe y devuelve un DynamicFrame o DynamicFrameCollection mediante la información de la conexión JDBC especificada.

  • frame_or_dfc: el DynamicFrame o la DynamicFrameCollection que se van a escribir.

  • catalog_connection: conexión al catálogo que se va a utilizar.

  • connection_options: opciones de conexión, como la tabla de rutas y bases de datos (opcional). Para obtener más información, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.

  • redshift_tmp_dir: directorio provisional de Amazon Redshift que se va a utilizar (opcional).

  • transformation_ctx: contexto de transformación que se va a utilizar (opcional).

  • catalog_id: ID de catálogo (ID de cuenta) del Catálogo de datos al que se accede. Cuando el valor es Ninguno, se utiliza el ID de cuenta predeterminado del intermediario.