Conexões do Kafka - AWS Glue

Conexões do Kafka

É possível usar uma conexão do Kafka para ler e gravar fluxos de dados do Kafka usando informações armazenadas em uma tabela do Catálogo de Dados ou fornecendo informações para acessar diretamente o fluxo de dados. A conexão é compatível com um cluster do Kafka ou um cluster do Amazon Managed Streaming for Apache Kafka. Você pode ler informações do Kafka em um Spark DataFrame e depois convertê-las em um AWS Glue DynamicFrame. É possível gravar DynamicFrames no Kafka em um formato JSON. Se você acessar diretamente o fluxo de dados, use essas opções para fornecer as informações sobre como acessar o fluxo de dados.

Se você usar getCatalogSource ou create_data_frame_from_catalog para consumir registros de uma fonte de streaming do Kafka, ou getCatalogSink ou write_dynamic_frame_from_catalog para gravar registros no Kafka, e o trabalho tiver o banco de dados do catálogo de dados e as informações de nome da tabela, e poderá usá-los para obter alguns parâmetros básicos para leitura da fonte de streaming do Kafka. Se você usar getSource, getCatalogSink, getSourceWithFormat, getSinkWithFormat, createDataFrameFromOptions, create_data_frame_from_options ou write_dynamic_frame_from_catalog, será necessário especificar esses parâmetros básicos usando as opções de conexão descritas aqui.

Você pode especificar as opções de conexão para o Kafka usando os seguintes argumentos para os métodos especificados na classe GlueContext.

  • Scala

    • connectionOptions: usar com getSource, createDataFrameFromOptions, getSink

    • additionalOptions: usar com getCatalogSource, getCatalogSink

    • options: usar com getSourceWithFormat, getSinkWithFormat

  • Python

    • connection_options: usar com create_data_frame_from_options, write_dynamic_frame_from_options

    • additional_options: usar com create_data_frame_from_catalog, write_dynamic_frame_from_catalog

    • options: usar com getSource, getSink

Para notas e restrições sobre trabalhos de ETL de streaming, consulte Notas e restrições sobre ETL de transmissão.

Tópicos

    Configurar o Kafka

    Não há pré-requisitos da AWS para se conectar aos fluxos do Kafka disponíveis pela Internet.

    Você pode criar uma conexão do AWS Glue Kafka para gerenciar suas credenciais de conexão. Para ter mais informações, consulte Criar uma conexão do AWS Glue para um fluxo de dados do Apache Kafka. Na configuração do trabalho do AWS Glue, forneça connectionName como uma conexão de rede adicional e depois na chamada do método, forneça connectionName para o parâmetro connectionName.

    Em certos casos, você precisará configurar pré-requisitos adicionais:

    • Se estiver usando o Amazon Managed Streaming for Apache Kafka com autenticação do IAM, você precisará da configuração apropriada do IAM.

    • Se estiver usando o Amazon Managed Streaming for Apache Kafka com uma Amazon VPC, você precisará da configuração apropriada do Amazon VPC. Você precisará criar uma conexão do AWS Glue que forneça informações de conexão com o Amazon VPC. Você precisará que a configuração do trabalho inclua a conexão do AWS Glue como uma conexão de rede adicional.

    Para obter mais informações sobre pré-requisitos de trabalho de ETL de streaming, consulte Trabalhos de transmissão de ETL no AWS Glue.

    Exemplo: leitura de fluxos do Kafka

    Usado em conjunto com forEachBatch.

    Exemplo para fonte de transmissão do Kafka:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Exemplo: gravação em fluxos do Kafka

    Exemplos de gravação no Kafka:

    Exemplo com o método getSink:

    data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()

    Exemplo com o método write_dynamic_frame.from_options:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Referência de opções de conexão do Kafka

    Ao ler, use as seguintes opções de conexão com "connectionType": "kafka":

    • "bootstrap.servers" (obrigatório) uma lista de URLs do servidor de bootstrap, por exemplo, como b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094. Essa opção deve ser especificada na chamada de API ou definida nos metadados da tabela no Data Catalog.

    • "security.protocol" (obrigatório) O protocolo usado para se comunicar com os agentes. Os valores possíveis são "SSL" ou "PLAINTEXT".

    • "topicName" (Obrigatório): uma lista separada por vírgulas de tópicos para assinar. É necessário especificar um e apenas um de "topicName", "assign" ou "subscribePattern".

    • "assign" (Obrigatório): uma string JSON para especificar o TopicPartitions a ser consumido. É necessário especificar um e apenas um de "topicName", "assign" ou "subscribePattern".

      Exemplo: '{"topicA":[0,1],"topicB":[2,4]}'

    • "subscribePattern": (obrigatório) uma string regex Java que identifica a lista de tópicos para assinar. É necessário especificar um e apenas um de "topicName", "assign" ou "subscribePattern".

      Exemplo: 'topic.*'

    • "classification" (Obrigatório) O formato de arquivo usado pelos dados no registro. Obrigatório, a menos que fornecido por meio do catálogo de dados.

    • "delimiter" (Opcional) O separador de valores usado quando a classification é CSV. O padrão é ",".

    • "startingOffsets": (opcional) a posição inicial no tópico do Kafka de onde ler os dados. Os valores possíveis são "earliest" ou "latest". O valor padrão é "latest".

    • "startingTimestamp": (opcional, compatível somente com o AWS Glue versão 4.0 ou posterior) O timestamp do registro no tópico do Kafka do qual ler os dados. O valor possível é uma string de timestamp no formato UTC no padrão yyyy-mm-ddTHH:MM:SSZ (onde Z representa um desvio do fuso horário UTC com +/-). Por exemplo: "2023-04-04T08:00:00-04:00").

      Observação: somente um dos 'startingOffsets' ou 'startingTimestamp' pode estar presente na lista de opções de conexão do script de streaming do AWS Glue. A inclusão de ambas as propriedades resultará em falha no trabalho.

    • "endingOffsets": (opcional) o ponto final quando uma consulta em lote é encerrada. Os valores possíveis são "latest" ou uma string JSON que especifica um deslocamento final para cada TopicPartition.

      Para a string JSON, o formato é {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}. O valor -1 como um deslocamento representa "latest".

    • "pollTimeoutMs": (opcional) o tempo limite em milissegundos para sondar dados do Kafka em executores de trabalho do Spark. O valor padrão é 512.

    • "numRetries": (opcional) o número de novas tentativas antes de falhar em obter os deslocamentos do Kafka. O valor padrão é 3.

    • "retryIntervalMs": (opcional) o tempo em milissegundos a se esperar antes de tentar novamente buscar os deslocamentos do Kafka. O valor padrão é 10.

    • "maxOffsetsPerTrigger": (opcional) o limite de taxa no número máximo de deslocamentos que são processados por intervalo do acionador. O número total especificado de deslocamentos é dividido proporcionalmente entre topicPartitions de diferentes volumes. O valor padrão é nulo, o que significa que o consumidor lê todos os deslocamentos até o deslocamento mais recente conhecido.

    • "minPartitions": (opcional) o número mínimo desejado de partições a serem lidas do Kafka. O valor padrão é nulo, o que significa que o número de partições do Spark é igual ao número de partições do Kafka.

    • "includeHeaders": (opcional) informa se deve incluir os cabeçalhos do Kafka. Quando a opção estiver definida como "true", a saída de dados conterá uma coluna adicional chamada "glue_streaming_kafka_headers" com o tipo Array[Struct(key: String, value: String)]. O valor padrão é “false”. Essa opção está disponível no AWS Glue versão 3.0 ou posterior.

    • "schema" (Obrigatório quando inferSchema é definido como false): o esquema a ser usado para processar a carga útil. Se a classificação for avro, o esquema fornecido deverá estar no formato de esquema Avro. Se a classificação não for avro, o esquema fornecido deverá estar no formato de esquema DDL.

      Veja a seguir alguns exemplos de esquema.

      Example in DDL schema format
      'column1' INT, 'column2' STRING , 'column3' FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema" (Opcional): o valor padrão é "false". Se definido como “true”, o esquema será detectado em runtime com base na carga útil em foreachbatch.

    • "avroSchema" (Descontinuado): parâmetro usado para especificar um esquema de dados Avro quando o formato Avro é usado. Esse parâmetro foi descontinuado. Use o parâmetro schema.

    • "addRecordTimestamp": (opcional) quando essa opção for definida como “true“, a saída de dados conterá uma coluna adicional denominada “__src_timestamp” que indica a hora em que o registro correspondente foi recebido pelo tópico. O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior.

    • "emitConsumerLagMetrics": (opcional) quando a opção for definida como "true", para cada lote, serão emitidas métricas durante o período entre a hora que o registro mais antigo é recebido pelo tópico e a hora que ele chega ao AWS Glue para o CloudWatch. O nome da métrica é "glue.driver.streaming.maxConsumerLagInMs". O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior.

    Ao gravar, use as seguintes opções de conexão com o "connectionType": "kafka":

    • "connectionName" (Obrigatório) Nome da conexão do AWS Glue usada para se conectar ao cluster do Kafka (semelhante à origem do Kafka).

    • "topic" (Obrigatório) Se uma coluna de tópico existir, seu valor será usado como tópico ao gravar a linha especificada no Kafka, a menos que a opção de configuração do tópico esteja definida. Ou seja, a opção de configuração topic substitui a coluna do tópico.

    • "partition" (Opcional) Se um número de partição válido for especificado, essa partition será usada no envio do registro.

      Se nenhuma partição for especificada, mas uma key estiver presente, uma partição será escolhida usando um hash da chave.

      Se nem key nem partition estiverem presentes, uma partição será escolhida com base no particionamento fixo dessas alterações quando pelo menos bytes batch.size forem produzidos na partição.

    • "key" (Opcional) Usado para particionar se partition for nulo.

    • "classification" (Opcional) O formato de arquivo usado pelos dados no registro. Só oferecemos suporte a JSON, CSV e Avro.

      Com o formato Avro, podemos fornecer um avroSchema personalizado para serializar, mas observe que isso também precisa ser fornecido na fonte para desserialização. Caso contrário, por padrão, ele usa o Apache AvroSchema para serializar.

    Além disso, você pode ajustar o coletor Kafka conforme necessário atualizando os parâmetros de configuração do produtor Kafka. Observe que não há nenhuma lista de permissões nas opções de conexão. Todos os pares de chave-valor são mantidos no coletor como estão.

    No entanto, há uma pequena lista de opções negadas que não entrarão em vigor. Para obter mais informações, consulte Configurações específicas do Kafka.