Conexões do Kafka
É possível usar uma conexão do Kafka para ler e gravar fluxos de dados do Kafka usando informações armazenadas em uma tabela do Catálogo de Dados ou fornecendo informações para acessar diretamente o fluxo de dados. A conexão é compatível com um cluster do Kafka ou um cluster do Amazon Managed Streaming for Apache Kafka. Você pode ler informações do Kafka em um Spark DataFrame e depois convertê-las em um AWS Glue DynamicFrame. É possível gravar DynamicFrames no Kafka em um formato JSON. Se você acessar diretamente o fluxo de dados, use essas opções para fornecer as informações sobre como acessar o fluxo de dados.
Se você usar getCatalogSource
ou create_data_frame_from_catalog
para consumir registros de uma fonte de streaming do Kafka, ou getCatalogSink
ou write_dynamic_frame_from_catalog
para gravar registros no Kafka, e o trabalho tiver o banco de dados do catálogo de dados e as informações de nome da tabela, e poderá usá-los para obter alguns parâmetros básicos para leitura da fonte de streaming do Kafka. Se você usar getSource
, getCatalogSink
, getSourceWithFormat
, getSinkWithFormat
, createDataFrameFromOptions
, create_data_frame_from_options
ou write_dynamic_frame_from_catalog
, será necessário especificar esses parâmetros básicos usando as opções de conexão descritas aqui.
Você pode especificar as opções de conexão para o Kafka usando os seguintes argumentos para os métodos especificados na classe GlueContext
.
-
Scala
-
connectionOptions
: usar comgetSource
,createDataFrameFromOptions
,getSink
-
additionalOptions
: usar comgetCatalogSource
,getCatalogSink
-
options
: usar comgetSourceWithFormat
,getSinkWithFormat
-
-
Python
-
connection_options
: usar comcreate_data_frame_from_options
,write_dynamic_frame_from_options
-
additional_options
: usar comcreate_data_frame_from_catalog
,write_dynamic_frame_from_catalog
-
options
: usar comgetSource
,getSink
-
Para notas e restrições sobre trabalhos de ETL de streaming, consulte Notas e restrições sobre ETL de transmissão.
Tópicos
Configurar o Kafka
Não há pré-requisitos da AWS para se conectar aos fluxos do Kafka disponíveis pela Internet.
Você pode criar uma conexão do AWS Glue Kafka para gerenciar suas credenciais de conexão. Para ter mais informações, consulte Criar uma conexão do AWS Glue para um fluxo de dados do Apache Kafka. Na configuração do trabalho do AWS Glue, forneça connectionName
como uma conexão de rede adicional e depois na chamada do método, forneça connectionName
para o parâmetro connectionName
.
Em certos casos, você precisará configurar pré-requisitos adicionais:
-
Se estiver usando o Amazon Managed Streaming for Apache Kafka com autenticação do IAM, você precisará da configuração apropriada do IAM.
-
Se estiver usando o Amazon Managed Streaming for Apache Kafka com uma Amazon VPC, você precisará da configuração apropriada do Amazon VPC. Você precisará criar uma conexão do AWS Glue que forneça informações de conexão com o Amazon VPC. Você precisará que a configuração do trabalho inclua a conexão do AWS Glue como uma conexão de rede adicional.
Para obter mais informações sobre pré-requisitos de trabalho de ETL de streaming, consulte Trabalhos de transmissão de ETL no AWS Glue.
Exemplo: leitura de fluxos do Kafka
Usado em conjunto com forEachBatch.
Exemplo para fonte de transmissão do Kafka:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Exemplo: gravação em fluxos do Kafka
Exemplos de gravação no Kafka:
Exemplo com o método getSink
:
data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()
Exemplo com o método write_dynamic_frame.from_options
:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Referência de opções de conexão do Kafka
Ao ler, use as seguintes opções de conexão com "connectionType": "kafka"
:
-
"bootstrap.servers"
(obrigatório) uma lista de URLs do servidor de bootstrap, por exemplo, comob-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
. Essa opção deve ser especificada na chamada de API ou definida nos metadados da tabela no Data Catalog. -
"security.protocol"
(obrigatório) O protocolo usado para se comunicar com os agentes. Os valores possíveis são"SSL"
ou"PLAINTEXT"
. -
"topicName"
(Obrigatório): uma lista separada por vírgulas de tópicos para assinar. É necessário especificar um e apenas um de"topicName"
,"assign"
ou"subscribePattern"
. -
"assign"
(Obrigatório): uma string JSON para especificar oTopicPartitions
a ser consumido. É necessário especificar um e apenas um de"topicName"
,"assign"
ou"subscribePattern"
.Exemplo: '{"topicA":[0,1],"topicB":[2,4]}'
-
"subscribePattern"
: (obrigatório) uma string regex Java que identifica a lista de tópicos para assinar. É necessário especificar um e apenas um de"topicName"
,"assign"
ou"subscribePattern"
.Exemplo: 'topic.*'
-
"classification"
(Obrigatório) O formato de arquivo usado pelos dados no registro. Obrigatório, a menos que fornecido por meio do catálogo de dados. -
"delimiter"
(Opcional) O separador de valores usado quando aclassification
é CSV. O padrão é ",
". -
"startingOffsets"
: (opcional) a posição inicial no tópico do Kafka de onde ler os dados. Os valores possíveis são"earliest"
ou"latest"
. O valor padrão é"latest"
. -
"startingTimestamp"
: (opcional, compatível somente com o AWS Glue versão 4.0 ou posterior) O timestamp do registro no tópico do Kafka do qual ler os dados. O valor possível é uma string de timestamp no formato UTC no padrãoyyyy-mm-ddTHH:MM:SSZ
(ondeZ
representa um desvio do fuso horário UTC com +/-). Por exemplo: "2023-04-04T08:00:00-04:00").Observação: somente um dos 'startingOffsets' ou 'startingTimestamp' pode estar presente na lista de opções de conexão do script de streaming do AWS Glue. A inclusão de ambas as propriedades resultará em falha no trabalho.
-
"endingOffsets"
: (opcional) o ponto final quando uma consulta em lote é encerrada. Os valores possíveis são"latest"
ou uma string JSON que especifica um deslocamento final para cadaTopicPartition
.Para a string JSON, o formato é
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
. O valor-1
como um deslocamento representa"latest"
. -
"pollTimeoutMs"
: (opcional) o tempo limite em milissegundos para sondar dados do Kafka em executores de trabalho do Spark. O valor padrão é512
. -
"numRetries"
: (opcional) o número de novas tentativas antes de falhar em obter os deslocamentos do Kafka. O valor padrão é3
. -
"retryIntervalMs"
: (opcional) o tempo em milissegundos a se esperar antes de tentar novamente buscar os deslocamentos do Kafka. O valor padrão é10
. -
"maxOffsetsPerTrigger"
: (opcional) o limite de taxa no número máximo de deslocamentos que são processados por intervalo do acionador. O número total especificado de deslocamentos é dividido proporcionalmente entretopicPartitions
de diferentes volumes. O valor padrão é nulo, o que significa que o consumidor lê todos os deslocamentos até o deslocamento mais recente conhecido. -
"minPartitions"
: (opcional) o número mínimo desejado de partições a serem lidas do Kafka. O valor padrão é nulo, o que significa que o número de partições do Spark é igual ao número de partições do Kafka. -
"includeHeaders"
: (opcional) informa se deve incluir os cabeçalhos do Kafka. Quando a opção estiver definida como "true", a saída de dados conterá uma coluna adicional chamada "glue_streaming_kafka_headers" com o tipoArray[Struct(key: String, value: String)]
. O valor padrão é “false”. Essa opção está disponível no AWS Glue versão 3.0 ou posterior. -
"schema"
(Obrigatório quando inferSchema é definido como false): o esquema a ser usado para processar a carga útil. Se a classificação foravro
, o esquema fornecido deverá estar no formato de esquema Avro. Se a classificação não foravro
, o esquema fornecido deverá estar no formato de esquema DDL.Veja a seguir alguns exemplos de esquema.
-
"inferSchema"
(Opcional): o valor padrão é "false". Se definido como “true”, o esquema será detectado em runtime com base na carga útil emforeachbatch
. -
"avroSchema"
(Descontinuado): parâmetro usado para especificar um esquema de dados Avro quando o formato Avro é usado. Esse parâmetro foi descontinuado. Use o parâmetroschema
. -
"addRecordTimestamp"
: (opcional) quando essa opção for definida como “true“, a saída de dados conterá uma coluna adicional denominada “__src_timestamp” que indica a hora em que o registro correspondente foi recebido pelo tópico. O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior. -
"emitConsumerLagMetrics"
: (opcional) quando a opção for definida como "true", para cada lote, serão emitidas métricas durante o período entre a hora que o registro mais antigo é recebido pelo tópico e a hora que ele chega ao AWS Glue para o CloudWatch. O nome da métrica é "glue.driver.streaming.maxConsumerLagInMs". O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior.
Ao gravar, use as seguintes opções de conexão com o "connectionType": "kafka"
:
-
"connectionName"
(Obrigatório) Nome da conexão do AWS Glue usada para se conectar ao cluster do Kafka (semelhante à origem do Kafka). -
"topic"
(Obrigatório) Se uma coluna de tópico existir, seu valor será usado como tópico ao gravar a linha especificada no Kafka, a menos que a opção de configuração do tópico esteja definida. Ou seja, a opção de configuraçãotopic
substitui a coluna do tópico. -
"partition"
(Opcional) Se um número de partição válido for especificado, essapartition
será usada no envio do registro.Se nenhuma partição for especificada, mas uma
key
estiver presente, uma partição será escolhida usando um hash da chave.Se nem
key
nempartition
estiverem presentes, uma partição será escolhida com base no particionamento fixo dessas alterações quando pelo menos bytes batch.size forem produzidos na partição. -
"key"
(Opcional) Usado para particionar separtition
for nulo. -
"classification"
(Opcional) O formato de arquivo usado pelos dados no registro. Só oferecemos suporte a JSON, CSV e Avro.Com o formato Avro, podemos fornecer um avroSchema personalizado para serializar, mas observe que isso também precisa ser fornecido na fonte para desserialização. Caso contrário, por padrão, ele usa o Apache AvroSchema para serializar.
Além disso, você pode ajustar o coletor Kafka conforme necessário atualizando os parâmetros de configuração do produtor Kafka
No entanto, há uma pequena lista de opções negadas que não entrarão em vigor. Para obter mais informações, consulte Configurações específicas do Kafka