Conexiones de Kafka
Puede utilizar una conexión de Kafka para leer y escribir en los flujos de datos de Kafka mediante información almacenada en una tabla del catálogo de datos o si proporciona información para acceder directamente al flujo de datos. La conexión admite un clúster de Kafka o un clúster de Amazon Managed Streaming para Apache Kafka. Puede leer la información de Kafka en un DataFrame de Spark para luego convertirla en un DynamicFrame de AWS Glue. Puede escribir los DynamicFrame en Kafka en un formato JSON. Si accede directamente a la secuencia de datos, utilice estas opciones para proporcionar información sobre cómo acceder a la secuencia de datos.
Si utiliza getCatalogSource
o create_data_frame_from_catalog
para consumir los registros de un origen de streaming de Kafka, o getCatalogSink
o write_dynamic_frame_from_catalog
para escribir registros en Kafka, el trabajo cuenta con la base de datos del Catálogo de datos y la información del nombre de la tabla, lo cual se puede utilizar para obtener algunos parámetros básicos para la lectura de un origen de streaming de Kafka. Si utiliza getSource
, getCatalogSink
, getSourceWithFormat
, getSinkWithFormat
, createDataFrameFromOptions
, create_data_frame_from_options
o write_dynamic_frame_from_catalog
, debe especificar estos parámetros básicos con las opciones de conexión que se describen aquí.
Puede especificar las opciones de conexión para Kafka con los argumentos que se mencionan a continuación para los métodos especificados en la clase GlueContext
.
-
Scala
-
connectionOptions
: se debe utilizar congetSource
,createDataFrameFromOptions
ygetSink
-
additionalOptions
: se debe utilizar congetCatalogSource
,getCatalogSink
-
options
: se debe utilizar congetSourceWithFormat
,getSinkWithFormat
-
-
Python
-
connection_options
: se debe utilizar concreate_data_frame_from_options
,write_dynamic_frame_from_options
-
additional_options
: se debe utilizar concreate_data_frame_from_catalog
,write_dynamic_frame_from_catalog
-
options
: se debe utilizar congetSource
,getSink
-
Para obtener notas y conocer las restricciones sobre los trabajos de ETL de transmisión, consulte Notas y restricciones de ETL de streaming.
Temas
Configurar Kafka
No hay requisitos previos AWS para conectarse a las transmisiones de Kafka disponibles a través de Internet.
Puede crear una conexión Glue Kafka AWS para gestionar sus credenciales de conexión. Para obtener más información, consulte Creación de una conexión de AWS Glue para un flujo de datos Apache Kafka. En la configuración del trabajo de Glue AWS, proporcione connectionName
como conexión de red adicional y, a continuación, en la llamada al método, proporcione connectionName
al parámetro connectionName
.
En algunos casos, tendrá que configurar requisitos previos adicionales:
-
Si utiliza Amazon Managed Streaming for Apache Kafka con autenticación de IAM, necesitará una configuración de IAM adecuada.
-
Si utiliza Amazon Managed Streaming for Apache Kafka con una Amazon VPC, necesitará una configuración de Amazon VPC adecuada. Deberá crear una conexión a AWS Glue que proporcione información de conexión de Amazon VPC. Necesitará que la configuración de su trabajo incluya la conexión Glue AWS como conexión de red adicional.
Para obtener más información sobre los requisitos previos del trabajo de ETL de Streaming, consulte Trabajos ETL de streaming en AWS Glue.
Ejemplo: leer desde transmisiones de Kafka
Se utiliza junto con forEachBatch.
Ejemplo de origen de streaming de Kafka:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Ejemplo: escritura a flujos de Kafka
Ejemplos de escritura a Kafka:
Ejemplo con el método getSink
:
data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()
Ejemplo con el método write_dynamic_frame.from_options
:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Referencia de opciones de conexión de Kafka
Al leer, utilice las opciones de conexión con "connectionType": "kafka"
a continuación:
-
"bootstrap.servers"
(obligatorio): una lista de direcciones URL Bootstrap, por ejemplo, comob-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
. Esta opción debe especificarse en la llamada a la API o definirse en los metadatos de la tabla en el Data Catalog. -
"security.protocol"
(obligatorio): el protocolo que se utiliza para la comunicación con los agentes. Los valores posibles son"SSL"
o."PLAINTEXT"
-
"topicName"
(Obligatorio) Lista separada por comas de temas a los que suscribirse. Debe especificar solo una opción de"topicName"
,"assign"
o"subscribePattern"
. -
"assign"
: (Obligatorio) Una cadena JSON que especifica el valor deTopicPartitions
para consumir. Debe especificar solo una opción de"topicName"
,"assign"
o"subscribePattern"
.Ejemplo: “{"temaA":[0,1],"temaB":[2,4]}”
-
"subscribePattern"
: (obligatorio) cadena de expresiones regulares de Java que identifica la lista de temas a la que desea suscribirse. Debe especificar solo una opción de"topicName"
,"assign"
o"subscribePattern"
.Ejemplo: “tema.*”
-
"classification"
(Obligatorio) El formato de archivo utilizado por los datos del registro. Obligatorio a menos que se proporcione a través del catálogo de datos. -
"delimiter"
(Opcional) El separador de valores que se utiliza cuandoclassification
es CSV. El valor predeterminado es “,
”. -
"startingOffsets"
: (opcional) posición inicial en el tema de Kafka para leer los datos. Los valores posibles son"earliest"
o."latest"
El valor predeterminado es"latest"
. -
"startingTimestamp"
: (Opcional, solo compatible con la versión 4.0 o posterior de Glue AWS) La marca de tiempo del registro en el tema de Kafka para leer los datos. Los valores posibles son una cadena de marca de tiempo en formato UTC en el patrónyyyy-mm-ddTHH:MM:SSZ
(dondeZ
representa un desplazamiento de zona horaria UTC con un +/-. Por ejemplo, “2023-04-04T08:00:00-04:00”).Nota: Solo una de las opciones 'startingOffsets' o 'startingTimestamp' puede estar presente en la lista de opciones de conexión del script de streaming de AWS Glue, incluir estas dos propiedades producirá un error en el trabajo.
-
"endingOffsets"
: (opcional) el punto final cuando finaliza una consulta por lotes. Los valores posibles son"latest"
o una cadena JSON que especifica una compensación final para cadaTopicPartition
.Para la cadena JSON, el formato es
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
. El valor-1
como compensación representa"latest"
. -
"pollTimeoutMs"
: (opcional) tiempo de espera en milisegundos para sondear datos de Kafka en ejecutores de trabajos de Spark. El valor predeterminado es512
. -
"numRetries"
: (opcional) el número de veces que se reintenta antes de no obtener las compensaciones de Kafka. El valor predeterminado es3
. -
"retryIntervalMs"
: (opcional) tiempo en milisegundos para esperar antes de volver a intentar obtener compensaciones Kafka. El valor predeterminado es10
. -
"maxOffsetsPerTrigger"
: (opcional) el límite de velocidad en el número máximo de compensaciones que se procesan por intervalo de desencadenador. El número total de compensaciones especificado se divide de forma proporcional entretopicPartitions
de diferentes volúmenes. El valor predeterminado es nulo, lo que significa que el consumidor lee todos las compensaciones hasta la última compensación conocida. -
"minPartitions"
: (opcional) el número mínimo deseado de particiones para leer desde Kafka. El valor predeterminado es nulo, lo que significa que el número de particiones de Spark es igual al número de particiones de Kafka. -
"includeHeaders"
: (opcional) si se deben incluir los encabezados de Kafka. Cuando la opción se establece en “verdadero”, la salida de datos contendrá una columna adicional denominada “glue_streaming_kafka_headers” con el tipoArray[Struct(key: String, value: String)]
. El valor predeterminado es "false". Esta opción se encuentra disponible en la versión 3.0 o posterior de AWS Glue. -
"schema"
: (Obligatorio cuando inferSchema se establece en false) Esquema que se va a utilizar para procesar la carga. Si la clasificación esavro
, el esquema proporcionado debe estar en el formato de esquema Avro. Si la clasificación no esavro
, el esquema proporcionado debe estar en el formato de esquema DDL.A continuación, se muestran algunos ejemplos de esquemas.
-
"inferSchema"
: (opcional) El valor predeterminado es “false”. Si se establece en “true”, el esquema se detectará durante el tiempo de ejecución desde la carga dentro deforeachbatch
. -
"avroSchema"
: (obsoleto) Parámetro utilizado para especificar un esquema de datos Avro cuando se utiliza el formato Avro. Este parámetro se ha quedado obsoleto. Utilice el parámetroschema
. -
"addRecordTimestamp"
: (opcional) cuando esta opción se establece en “true”, la salida de datos contendrá una columna adicional denominada “__src_timestamp” que indica la hora en la que el tema recibió el registro correspondiente. El valor predeterminado es "false". Esta opción es compatible con la versión 4.0 o posterior de AWS Glue. -
"emitConsumerLagMetrics"
: (opcional) cuando esta opción se establece en “true”, para cada lote, emitirá las métricas correspondientes al período comprendido entre el registro más antiguo recibido por el tema y el momento en que llegue a AWS Glue en CloudWatch. El nombre de la métrica es “glue.driver.streaming.maxConsumerLagInMs”. El valor predeterminado es "false". Esta opción es compatible con la versión 4.0 o posterior de AWS Glue.
Al escribir, utilice las opciones de conexión con "connectionType": "kafka"
a continuación:
-
"connectionName"
(Obligatorio) Nombre de la conexión de AWS Glue utilizada para conectar al clúster de Kafka (similar al origen de Kafka). -
"topic"
(Obligatorio) Si existe una columna de temas, su valor se utiliza como el tema al momento de la escritura de la fila en Kafka, a menos que esté establecida la opción de configuración de temas. Es decir, la opción de configuración detopic
anula la columna de temas. -
"partition"
(Opcional) Si se especifica un número válido de partición, estapartition
se utilizará cuando se envíe el registro.Si no se especifica ninguna partición pero hay una
key
, se elegirá una partición con el hash de la clave.Si no hay ni una
key
ni unapartition
, se elegirá una partición según la partición sticky de esos cambios cuando al menos se produzcan bytes de batch.size en la partición. -
"key"
(Opcional) Utilizado para la partición si lapartition
es nula. -
"classification"
(Opcional) El formato de archivo utilizado por los datos en el registro. Solo se admiten los formatos JSON, CSV y Avro.Con el formato Avro, podemos brindar un AvroSchema personalizado para la serialización, pero tenga en cuenta que también se tiene que brindar en el origen para la deserialización. Si no, se utilizará de manera predeterminada el AvroSchema de Apache para la serialización.
Además, puede ajustar los receptores de Kafka según sea necesario al actualizar los parámetros de configuración de productor de Kafka
Sin embargo, existe una pequeña lista de opciones de rechazos que no tendrá efecto. Para obtener más información, consulte las Configuraciones específicas de Kafka