Apache Kafka - AWS IoT Core

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Apache Kafka

A ação Apache Kafka (Kafka) envia mensagens diretamente para seu Amazon Managed Streaming for Apache Kafka (Amazon MSK), clusters Apache Kafka gerenciados por provedores terceirizados, como o Confluent Cloud, ou clusters autogerenciados do Apache Kafka para análise e visualização de dados.

nota

Este tópico pressupõe familiaridade com a plataforma Apache Kafka e conceitos relacionados. Para obter mais informações sobre o Apache Kafka, consulte Apache Kafka. O MSK Serverless não é suportado. Os clusters MSK Serverless só podem ser feitos por meio da autenticação do IAM, que a ação de regra do Apache Kafka não suporta atualmente.

Requisitos

Esta ação de regra tem os seguintes requisitos:

  • Uma função do IAM que AWS IoT pode ser assumida para realizar as ec2:DescribeSecurityGroups operações ec2:CreateNetworkInterface ec2:DescribeNetworkInterfaces ec2:CreateNetworkInterfacePermissionec2:DeleteNetworkInterface,ec2:DescribeSubnets,ec2:DescribeVpcs,ec2:DescribeVpcAttribute,,, e. Essa função cria e gerencia interfaces de rede elásticas para o Amazon Virtual Private Cloud para alcançar seu agente Kafka. Para ter mais informações, consulte Conceder a uma AWS IoT regra o acesso que ela exige.

    No AWS IoT console, você pode escolher ou criar uma função para permitir AWS IoT Core a execução dessa ação de regra.

    Para obter mais informações sobre interfaces de rede, consulte Interfaces de rede elásticas no Guia do usuário do Amazon EC2.

    A política vinculada à função que você especificar deve ser semelhante à do exemplo a seguir.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Se você costuma AWS Secrets Manager armazenar as credenciais necessárias para se conectar ao seu corretor Kafka, deve criar uma função do IAM que AWS IoT Core possa ser assumida para realizar as operações e. secretsmanager:GetSecretValue secretsmanager:DescribeSecret

    A política vinculada à função que você especificar deve ser semelhante à do exemplo a seguir.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • Você pode executar seus clusters do Apache Kafka dentro do Amazon Virtual Private Cloud (Amazon VPC). Você deve criar um destino Amazon VPC e usar um gateway NAT em suas sub-redes para encaminhar mensagens para um cluster público do Kafka AWS IoT . O AWS IoT mecanismo de regras cria uma interface de rede em cada uma das sub-redes listadas no destino da VPC para rotear o tráfego diretamente para a VPC. Quando você cria um destino de VPC, o mecanismo de AWS IoT regras cria automaticamente uma ação de regra de VPC. Para obter mais informações sobre ações de regras de VPC, consulte Destinos da nuvem privada virtual (VPC).

  • Se você usar uma chave gerenciada pelo cliente AWS KMS key (chave KMS) para criptografar dados em repouso, o serviço deverá ter permissão para usar a chave KMS em nome do chamador. Para obter mais informações, consulte Criptografia do Amazon MSK no Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.

Parâmetros

Ao criar uma AWS IoT regra com essa ação, você deve especificar as seguintes informações:

destinationArn

O nome do recurso da Amazon (ARN) do destino da VPC. Para obter informações sobre a criação do destino da VPC, consulte Destinos da nuvem privada virtual (VPC).

tópico

O tópico do Kafka para mensagens enviadas ao agente do Kafka.

Você pode substituir esse campo usando um modelo de substituição. Para obter mais informações, consulte Modelos de substituição.

chave (opcional)

A chave de mensagem do Kafka.

Você pode substituir esse campo usando um modelo de substituição. Para ter mais informações, consulte Modelos de substituição.

cabeçalhos (opcional)

A lista de cabeçalhos Kafka que você especifica. Cada cabeçalho é um par de chave-valor que você pode especificar ao criar uma ação do Kafka. Você pode usar esses cabeçalhos para rotear dados de clientes IoT para clusters Kafka downstream sem modificar a carga útil da mensagem.

Você pode substituir esse campo usando um modelo de substituição. Para entender como passar uma função de regra integrada como um modelo de substituição no cabeçalho do Kafka Action, consulte Exemplos. Para obter mais informações, consulte Modelos de substituição.

nota

Não há compatibilidade com cabeçalhos em formato binário.

partição (opcional)

A partição de mensagens do Kafka.

Você pode substituir esse campo usando um modelo de substituição. Para ter mais informações, consulte Modelos de substituição.

clientProperties

Um objeto que define as propriedades do cliente produtor Apache Kafka.

acks (opcional)

O número de confirmações que o produtor exige que o servidor tenha recebido antes de considerar uma solicitação concluída.

Se você especificar 0 como valor, o produtor não aguardará nenhuma confirmação do servidor. Se o servidor não receber a mensagem, o produtor não tentará enviá-la novamente.

Valores válidos: -1, 0, 1, all. O valor padrão é 1.

bootstrap.servers

Uma lista de pares de host e porta (por exemplo,host1:port1, host2:port2) usados para estabelecer a conexão inicial com o cluster Kafka.

compression.type (opcional)

O tipo de compressão para todos os dados gerados pelo produtor.

Valores válidos: none, gzip, snappy, lz4, zstd. O valor padrão é none.

security.protocol

O protocolo de segurança usado para se conectar ao seu agente Kafka.

Valores válidos: SSL, SASL_SSL. O valor padrão é SSL.

key.serializer

Especifica como transformar os principais objetos que você fornece com oProducerRecord em bytes.

Valor válido: StringSerializer.

value.serializador

Especifica como transformar os principais objetos que você fornece com o ProducerRecord em bytes.

Valor válido: ByteBufferSerializer.

ssl.truststore

O arquivo truststore no formato base64 ou a localização do arquivo truststore em AWS Secrets Manager. Esse valor não será necessário se o seu arquivo truststore for certificado pelas autoridades de certificação (CA) da Amazon.

Este campo oferece suporte a modelos de substituição. Se você usar o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka, poderá usar a função get_secret SQL para recuperar o valor desse campo. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre a get_secret função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Se o arquivo truststore estiver na forma de um arquivo, use o parâmetro SecretBinary. Se o arquivo truststore estiver na forma de uma string, use o parâmetro SecretString.

O tamanho máximo desse valor é 65 KB.

ssl.truststore.password

A senha do arquivo truststore. Esse valor é necessário somente se você tiver criado uma senha para o arquivo truststore.

ssl.keystore

O arquivo de armazenamento de chaves. Esse valor é necessário quando você especifica SSL como valor para security.protocol.

Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a get_secret função SQL. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre a get_secret função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Use o parâmetro SecretBinary.

ssl.keystore.password

A senha de armazenamento do arquivo keystore. Esse valor será exigido se um valor para ssl.keystore for especificado.

O valor desse campo pode ser texto simples. Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a get_secret função SQL. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre a get_secret função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Use o parâmetro SecretString.

ssl.key.password

A senha da chave privada em seu arquivo keystore.

Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a get_secret função SQL. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre a get_secret função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Use o parâmetro SecretString.

sasl.mechanism

O mecanismo de segurança usado para se conectar ao seu agente Kafka. Esse valor é necessário quando você especifica SASL_SSL para security.protocol.

Valores válidos: PLAIN, SCRAM-SHA-512, GSSAPI.

nota

SCRAM-SHA-512é o único mecanismo de segurança suportado nas regiões cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west

sasl.plain.username

O nome de usuário usado para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica SASL_SSL para security.protocol e PLAIN para sasl.mechanism .

sasl.plain.password

A senha usada para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica SASL_SSL para security.protocol e PLAIN para sasl.mechanism.

sasl.scram.username

O nome de usuário usado para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica SASL_SSL para security.protocol e SCRAM-SHA-512 para sasl.mechanism .

sasl.scram.password

A senha usada para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica SASL_SSL para security.protocol e SCRAM-SHA-512 para sasl.mechanism.

sasl.kerberos.keytab

O arquivo keytab para autenticação Kerberos no Secrets Manager. Esse valor é necessário quando você especifica SASL_SSL para security.protocol e GSSAPI para sasl.mechanism.

Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a get_secret função SQL. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre a get_secret função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Use o parâmetro SecretBinary.

sasl.kerberos.service.name

O nome principal do Kerberos sob o qual o Apache Kafka é executado. Esse valor é necessário quando você especifica SASL_SSL para security.protocol e GSSAPI para sasl.mechanism.

sasl.kerberos.krb5.kdc

O nome do host do centro de distribuição de chaves (KDC) ao qual seu cliente produtor do Apache Kafka se conecta. Esse valor é necessário quando você especifica SASL_SSL para security.protocol e GSSAPI para sasl.mechanism.

sasl.kerberos.krb5.realm

A região à qual seu cliente produtor Apache Kafka se conecta. Esse valor é necessário quando você especifica SASL_SSL para security.protocol e GSSAPI para sasl.mechanism.

sasl.kerberos.principal

A identidade Kerberos exclusiva à qual o Kerberos pode atribuir tickets para acessar serviços compatíveis com Kerberos. Esse valor é necessário quando você especifica SASL_SSL para security.protocol e GSSAPI para sasl.mechanism.

Exemplos

O exemplo de JSON a seguir define uma ação do Apache Kafka em uma regra. AWS IoT O exemplo a seguir passa a função integrada sourceIP() como um modelo de substituição no cabeçalho Kafka Action.

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

Notas importantes sobre a configuração do Kerberos

  • Seu centro de distribuição de chaves (KDC) deve ser resolvido por meio de um sistema de nomes de domínio (DNS) privado em sua VPC de destino. Uma abordagem possível é adicionar a entrada DNS do KDC a uma zona hospedada privada. Para obter mais informações sobre essa abordagem, consulte Como trabalhar com zonas hospedadas privadas.

  • Cada VPC deve ter a resolução DNS habilitada. Para obter mais informações, consulte Como usar o DNS com sua VPC.

  • Os grupos de segurança da interface de rede e os grupos de segurança em nível de instância no destino da VPC devem permitir o tráfego de dentro da sua VPC nas seguintes portas.

    • Tráfego TCP na porta do receptor do bootstrap broker (geralmente 9092, mas deve estar na faixa de 9000—9100)

    • Tráfego TCP e UDP na porta 88 para o KDC

  • SCRAM-SHA-512é o único mecanismo de segurança suportado nas regiões cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west