Conexión de Kinesis - AWS Glue

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Conexión de Kinesis

Puede utilizar una conexión de Kafka para leer y escribir en Amazon Kinesis Data Streams mediante información almacenada en una tabla del catálogo de datos o si proporciona información para acceder directamente al flujo de datos. Puede leer la información de Kinesis en un DataFrame de Spark y, a continuación, convertirla en un DynamicFrame de AWS Glue. Puede escribir DynamicFrames en Kinesis en formato JSON. Si accede directamente a la secuencia de datos, utilice estas opciones para proporcionar información sobre cómo acceder a la secuencia de datos.

Si utiliza getCatalogSource o create_data_frame_from_catalog para consumir registros de una fuente de streaming de Kinesis, el trabajo tiene la base de datos de Data Catalog y la información del nombre de la tabla, y puede utilizarla para obtener algunos parámetros básicos para la lectura de la fuente de streaming de Kinesis. Si utiliza getSource, getSourceWithFormat, createDataFrameFromOptions o create_data_frame_from_options debe especificar estos parámetros básicos mediante las opciones de conexión descritas aquí.

Puede especificar las opciones de conexión para Kinesis al utilizar los siguientes argumentos para los métodos especificados en la clase GlueContext.

  • Scala

    • connectionOptions: se debe utilizar con getSource, createDataFrameFromOptions y getSink

    • additionalOptions: se debe utilizar con getCatalogSource, getCatalogSink

    • options: se debe utilizar con getSourceWithFormat, getSinkWithFormat

  • Python

    • connection_options: se debe utilizar con create_data_frame_from_options, write_dynamic_frame_from_options

    • additional_options: se debe utilizar con create_data_frame_from_catalog, write_dynamic_frame_from_catalog

    • options: se debe utilizar con getSource, getSink

Para obtener notas y restricciones sobre los trabajos de ETL de Streaming, consulte Notas y restricciones de ETL de streaming.

Configurar Kinesis

Para conectarse desde un flujo de datos de Kinesis en un trabajo de AWS Glue Spark, necesitará algunos requisitos previos:

  • Si está leyendo, el trabajo de AWS Glue debe tener permisos de IAM de nivel de acceso de lectura para el flujo de datos de Kinesis.

  • Si está escribiendo, el trabajo de AWS Glue debe tener permisos de IAM de nivel de acceso de escritura para el flujo de datos de Kinesis.

En algunos casos, tendrá que configurar requisitos previos adicionales:

  • Si su trabajo de AWS Glue está configurado con conexiones de red adicionales (normalmente para conectarse a otros conjuntos de datos) y una de esas conexiones proporciona opciones de red de Amazon VPC, esto indicará que su trabajo se comunique a través de Amazon VPC. En este caso, también tendrá que configurar el flujo de datos de Kinesis para que se comunique a través de Amazon VPC. Puede hacerlo mediante la creación de un punto de conexión de VPC de tipo interfaz entre la Amazon VPC y el flujo de datos de Kinesis. Para obtener más información, consulte Uso de Kinesis de Amazon Kinesis Data Streams con puntos de conexión de VPC de interfaz.

  • Al especificar Amazon Kinesis Data Streams en otra cuenta, debe configurar los roles y las políticas para permitir el acceso entre cuentas. Para obtener más información, consulte Ejemplo: leer desde un flujo de Kinesis en una cuenta diferente.

Para obtener más información sobre los requisitos previos del trabajo de ETL de Streaming, consulte Trabajos ETL de streaming en AWS Glue.

Ejemplo: lectura de transmisiones desde Kinesis

Ejemplo: lectura de transmisiones desde Kinesis

Se utiliza junto con forEachBatch.

Ejemplo de origen de streaming de Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Ejemplo: escribir en flujos de Kinesis

Ejemplo: lectura de transmisiones desde Kinesis

Se utiliza junto con forEachBatch.

Ejemplo de origen de streaming de Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Referencia de opciones de conexión de Kinesis

Designa opciones de conexión para Amazon Kinesis Data Streams.

Utilice las siguientes opciones de conexión para los orígenes de datos de streaming de Kinesis:

  • "streamARN": (Obligatorio) Se utiliza para leer/escribir. El ARN de flujo de datos de Kinesis.

  • "classification": (Obligatorio para lectura) Se utiliza para leer. El formato de archivo utilizado por los datos del registro. Obligatorio a menos que se proporcione a través del catálogo de datos.

  • "streamName": (Opcional) Se usa para leer. Nombre de un flujo de datos de Kinesis para leer. Utilizado con endpointUrl.

  • "endpointUrl": (Opcional) Se usa para leer. Predeterminado: “https://kinesis.us-east-1.amazonaws.com”. El punto de conexión de AWS del flujo de Kinesis. No es necesario cambiar esto a menos que se conecte a una región especial.

  • "partitionKey": (Opcional) Se usa para escribir. La clave de partición de Kinesis que se utiliza al producir registros.

  • "delimiter": (Opcional) Se usa para leer. El separador de valores que se utiliza cuando classification es CSV. El valor predeterminado es “,”.

  • "startingPosition": (Opcional) Se usa para leer. La posición inicial en el flujo de datos de Kinesis para leer los datos. Los valores posibles son "latest", "trim_horizon", "earliest" o una cadena de marca de tiempo en formato UTC en el patrón yyyy-mm-ddTHH:MM:SSZ (donde Z representa un desplazamiento de zona horaria UTC con un +/-. Por ejemplo, “04-04-2023 T 08:00:00-04:00”). El valor predeterminado es "latest". Nota: La cadena de marca de tiempo en formato UTC para "startingPosition" solo es compatible con la versión 4.0 o posterior de Glue AWS.

  • "failOnDataLoss": (Opcional) No se realizará el trabajo si falta o ha caducado alguna partición activa. El valor predeterminado es "false".

  • "awsSTSRoleARN": (Opcional) Se usa para escribir/leer. El nombre de recurso de Amazon (ARN) del rol de que se asumirá mediante AWS Security Token Service (AWS STS). Este rol debe tener permisos para describir o leer operaciones de registros del flujo de datos de Kinesis. Debe utilizar este parámetro para acceder a un flujo de datos de otra cuenta. Se utiliza junto con "awsSTSSessionName".

  • "awsSTSSessionName": (Opcional) Se usa para escribir/leer. Un identificador para la sesión que asume el rol mediante AWS STS. Debe utilizar este parámetro para acceder a un flujo de datos de otra cuenta. Se utiliza junto con "awsSTSRoleARN".

  • "awsSTSEndpoint": (Opcional) El punto de conexión de AWS STS que se utilizará al conectarse a Kinesis con un rol asumido. Esto permite usar el punto de conexión regional de AWS STS en una VPC, lo que no es posible con el punto de conexión global predeterminado.

  • "maxFetchTimeInMs": (Opcional) Se usa para leer. El tiempo máximo que le tomó al ejecutor del trabajo leer los registros del lote actual en el flujo de datos de Kinesis, especificado en milisegundos (ms). Pueden realizarse varias llamadas a la API de GetRecords durante este tiempo. El valor predeterminado es 1000.

  • "maxFetchRecordsPerShard": (Opcional) Se usa para leer. El número máximo de registros que se recuperará por partición en el flujo de datos de Kinesis por microlote. Nota: El cliente puede exceder este límite si el trabajo de streaming ya leyó registros adicionales de Kinesis (en la misma llamada de obtención de registros). Si maxFetchRecordsPerShard tiene que ser preciso, entonces tiene que ser un múltiplo de maxRecordPerRead. El valor predeterminado es 100000.

  • "maxRecordPerRead": (Opcional) Se usa para leer. El número máximo de registros que se recuperará del flujo de datos de Kinesis en cada operación getRecords. El valor predeterminado es 10000.

  • "addIdleTimeBetweenReads": (Opcional) Se usa para leer. Agrega un retardo de tiempo entre dos operaciones getRecords consecutivas. El valor predeterminado es "False". Esta opción sólo se puede configurar para Glue versión 2.0 y superior.

  • "idleTimeBetweenReadsInMs": (Opcional) Se usa para leer. El tiempo mínimo de retraso entre dos operaciones getRecords consecutivas, especificado en ms. El valor predeterminado es 1000. Esta opción sólo se puede configurar para Glue versión 2.0 y superior.

  • "describeShardInterval": (Opcional) Se usa para leer. El intervalo mínimo de tiempo entre dos llamadas a la API ListShards para que su script considere cambios en los fragmentos. Para obtener más información, consulte Estrategias para cambios en los fragmentos en la Guía para desarrolladores de Amazon Kinesis Data Streams. El valor predeterminado es 1s.

  • "numRetries": (Opcional) Se usa para leer. El número máximo de reintentos para las solicitudes de la API de Kinesis Data Streams. El valor predeterminado es 3.

  • "retryIntervalMs": (Opcional) Se usa para leer. El periodo de enfriamiento (especificado en ms) antes de volver a intentar la llamada a la API de Kinesis Data Streams. El valor predeterminado es 1000.

  • "maxRetryIntervalMs": (Opcional) Se usa para leer. El periodo de enfriamiento máximo (especificado en ms) entre dos intentos de llamada a la API de Kinesis Data Streams. El valor predeterminado es 10000.

  • "avoidEmptyBatches": (Opcional) Se usa para leer. Evita crear un trabajo de microlotes vacío al comprobar si hay datos no leídos en el flujo de datos de Kinesis antes de que se inicie el lote. El valor predeterminado es "False".

  • "schema": (Obligatorio cuando inferSchema se establece en false) Se utiliza para leer. El esquema que se utilizará para procesar la carga útil. Si la clasificación es avro, el esquema proporcionado debe estar en el formato de esquema Avro. Si la clasificación no es avro, el esquema proporcionado debe estar en el formato de esquema DDL.

    A continuación, se muestran algunos ejemplos de esquemas.

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (Opcional) Se usa para leer. El valor predeterminado es "false". Si se establece en “true”, el esquema se detectará durante el tiempo de ejecución desde la carga dentro de foreachbatch.

  • "avroSchema": (Obsoleto) Se usa para leer. Parámetro utilizado para especificar un esquema de datos Avro cuando se utiliza el formato Avro. Este parámetro se ha quedado obsoleto. Utilice el parámetro schema.

  • "addRecordTimestamp": (Opcional) Se usa para leer. Cuando esta opción se establece en “true”, la salida de datos contendrá una columna adicional denominada “__src_timestamp” que indica la hora en la que el flujo recibió el registro correspondiente. El valor predeterminado es "false". Esta opción es compatible con la versión 4.0 o posterior de AWS Glue.

  • "emitConsumerLagMetrics": (Opcional) Se usa para leer. Cuando la opción se establece en “verdadera”, para cada lote, emitirá las métricas correspondientes al periodo comprendido entre el registro más antiguo recibido por el flujo y el momento en que llegue a AWS Glue en CloudWatch. El nombre de la métrica es “glue.driver.streaming.maxConsumerLagInMs”. El valor predeterminado es "false". Esta opción es compatible con la versión 4.0 o posterior de AWS Glue.

  • "fanoutConsumerARN": (Opcional) Se usa para leer. El ARN de un consumidor de un flujo de Kinesis para el flujo especificado en streamARN. Se utiliza para habilitar el modo de distribución mejorada para la conexión de Kinesis. Para obtener más información sobre cómo consumir una transmisión de Kinesis con una distribución mejorada, consulte Uso de una distribución mejorada en los trabajos de streaming de Kinesis.

  • "recordMaxBufferedTime": (Opcional) Se usa para escribir. Predeterminado: 1000 (ms). Tiempo máximo que un registro permanece almacenado en búfer mientras espera a ser escrito.

  • "aggregationEnabled": (Opcional) Se usa para escribir. Valor predeterminado: verdadero. Especifica si los registros deben agregarse antes de enviarlos a Kinesis.

  • "aggregationMaxSize": (Opcional) Se usa para escribir. Predeterminado: 51 200 (bytes). Si un registro supera este límite, omitirá el agregador. Nota: Kinesis impone un límite de 50 KB en el tamaño del registro. Si lo establece por encima de 50 KB, Kinesis rechazará los registros de gran tamaño.

  • "aggregationMaxCount": (Opcional) Se usa para escribir. Predeterminado: 4294967295. Número máximo de elementos a empaquetar en un registro agregado.

  • "producerRateLimit": (Opcional) Se usa para escribir. Predeterminado: 150 (%). Limita el rendimiento por partición enviado desde un solo productor (por ejemplo, su trabajo), como porcentaje del límite de backend.

  • "collectionMaxCount": (Opcional) Se usa para escribir. Predeterminado: 500. Número máximo de elementos a incluir en una solicitud de PutRecords.

  • "collectionMaxSize": (Opcional) Se usa para escribir. Predeterminado: 5 242 880 (bytes). Cantidad máxima de datos para enviar con una solicitud de PutRecords.