As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Apache Kafka
A ação Apache Kafka (Kafka) envia mensagens diretamente para seu Amazon Managed Streaming for Apache Kafka (Amazon), clusters Apache Kafka gerenciados por provedores terceirizados, como o Confluent Cloud, ou clusters autogerenciados do Apache Kafka. MSK
nota
Este tópico pressupõe familiaridade com a plataforma Apache Kafka e conceitos relacionados. Para obter mais informações sobre o Apache Kafka, consulte Apache Kafka.
Requisitos
Esta ação de regra tem os seguintes requisitos:
-
Uma IAM função que AWS IoT pode assumir para realizar as
ec2:DescribeSecurityGroups
operaçõesec2:CreateNetworkInterface
ec2:DescribeNetworkInterfaces
ec2:CreateNetworkInterfacePermission
,ec2:DeleteNetworkInterface
,ec2:DescribeSubnets
ec2:DescribeVpcs
,ec2:DescribeVpcAttribute
,,, e. Essa função cria e gerencia interfaces de rede elásticas para o Amazon Virtual Private Cloud para alcançar seu agente Kafka. Para obter mais informações, consulte Conceder a uma AWS IoT regra o acesso que ela exige.No AWS IoT console, você pode escolher ou criar uma função para permitir AWS IoT Core a execução dessa ação de regra.
Para obter mais informações sobre interfaces de rede, consulte Interfaces de rede elásticas no Amazon EC2 User Guide.
A política vinculada à função que você especificar deve ser semelhante à do exemplo a seguir.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
-
Se você costuma AWS Secrets Manager armazenar as credenciais necessárias para se conectar ao seu corretor Kafka, você deve criar uma IAM função que AWS IoT Core possa assumir para realizar as operações e.
secretsmanager:GetSecretValue
secretsmanager:DescribeSecret
A política vinculada à função que você especificar deve ser semelhante à do exemplo a seguir.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] } -
Você pode executar seus clusters do Apache Kafka dentro da Amazon Virtual Private Cloud (Amazon). VPC Você deve criar um VPC destino na Amazon e usar um NAT gateway em suas sub-redes para encaminhar mensagens AWS IoT para um cluster público do Kafka. O mecanismo de AWS IoT regras cria uma interface de rede em cada uma das sub-redes listadas no VPC destino para rotear o tráfego diretamente para o. VPC Quando você cria um VPC destino, o mecanismo de AWS IoT regras cria automaticamente uma ação de VPC regra. Para obter mais informações sobre ações de VPC regras, consulteDestinos de nuvem privada virtual (VPC).
-
Se você usar uma KMS chave gerenciada pelo AWS KMS key cliente para criptografar dados em repouso, o serviço deverá ter permissão para usar a KMS chave em nome do chamador. Para obter mais informações, consulte a MSKcriptografia da Amazon no Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.
Parâmetros
Ao criar uma AWS IoT regra com essa ação, você deve especificar as seguintes informações:
- destinationArn
-
O nome do recurso da Amazon (ARN) do VPC destino. Para obter informações sobre como criar um VPC destino, consulteDestinos de nuvem privada virtual (VPC).
- tópico
-
O tópico do Kafka para mensagens enviadas ao agente do Kafka.
Você pode substituir esse campo usando um modelo de substituição. Para obter mais informações, consulte Modelos de substituição.
- chave (opcional)
-
A chave de mensagem do Kafka.
Você pode substituir esse campo usando um modelo de substituição. Para obter mais informações, consulte Modelos de substituição.
- cabeçalhos (opcional)
-
A lista de cabeçalhos Kafka que você especifica. Cada cabeçalho é um par de chave-valor que você pode especificar ao criar uma ação do Kafka. Você pode usar esses cabeçalhos para rotear dados de clientes IoT para clusters Kafka downstream sem modificar a carga útil da mensagem.
Você pode substituir esse campo usando um modelo de substituição. Para entender como passar uma função de regra integrada como um modelo de substituição no cabeçalho do Kafka Action, consulte Exemplos. Para obter mais informações, consulte Modelos de substituição.
nota
Não há compatibilidade com cabeçalhos em formato binário.
- partição (opcional)
-
A partição de mensagens do Kafka.
Você pode substituir esse campo usando um modelo de substituição. Para obter mais informações, consulte Modelos de substituição.
- clientProperties
-
Um objeto que define as propriedades do cliente produtor Apache Kafka.
- acks (opcional)
-
O número de confirmações que o produtor exige que o servidor tenha recebido antes de considerar uma solicitação concluída.
Se você especificar 0 como valor, o produtor não aguardará nenhuma confirmação do servidor. Se o servidor não receber a mensagem, o produtor não tentará enviá-la novamente.
Valores válidos:
-1
,0
,1
,all
. O valor padrão é1
. - bootstrap.servers
-
Uma lista de pares de host e porta (por exemplo,
host1:port1
,host2:port2
) usados para estabelecer a conexão inicial com o cluster Kafka. - compression.type (opcional)
-
O tipo de compressão para todos os dados gerados pelo produtor.
Valores válidos:
none
,gzip
,snappy
,lz4
,zstd
. O valor padrão énone
. - security.protocol
-
O protocolo de segurança usado para se conectar ao seu agente Kafka.
Valores válidos:
SSL
,SASL_SSL
. O valor padrão éSSL
. - key.serializer
-
Especifica como transformar os principais objetos que você fornece com o
ProducerRecord
em bytes.Valor válido:
StringSerializer
. - value.serializador
-
Especifica como transformar os principais objetos que você fornece com o
ProducerRecord
em bytes.Valor válido:
ByteBufferSerializer
. - ssl.truststore
-
O arquivo truststore no formato base64 ou a localização do arquivo truststore em AWS Secrets Manager. Esse valor não será necessário se o seu arquivo truststore for certificado pelas autoridades de certificação (CA) da Amazon.
Este campo oferece suporte a modelos de substituição. Se você usar o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu corretor Kafka, poderá usar a
get_secret
SQL função para recuperar o valor desse campo. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
SQL função, consulteget_secret (secretId, secretType, chave, roleArn). Se o arquivo truststore estiver na forma de um arquivo, use o parâmetroSecretBinary
. Se o arquivo truststore estiver na forma de uma string, use o parâmetroSecretString
.O tamanho máximo desse valor é 65 KB.
- ssl.truststore.password
-
A senha do arquivo truststore. Esse valor é necessário somente se você tiver criado uma senha para o arquivo truststore.
- ssl.keystore
-
O arquivo de armazenamento de chaves. Esse valor é necessário quando você especifica
SSL
como valor parasecurity.protocol
.Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a
get_secret
SQL função. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
SQL função, consulteget_secret (secretId, secretType, chave, roleArn). Use o parâmetroSecretBinary
. - ssl.keystore.password
-
A senha de armazenamento do arquivo keystore. Esse valor será exigido se um valor para
ssl.keystore
for especificado.O valor desse campo pode ser texto simples. Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a
get_secret
SQL função. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
SQL função, consulteget_secret (secretId, secretType, chave, roleArn). Use o parâmetroSecretString
. - ssl.key.password
-
A senha da chave privada em seu arquivo keystore.
Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a
get_secret
SQL função. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
SQL função, consulteget_secret (secretId, secretType, chave, roleArn). Use o parâmetroSecretString
. - sasl.mechanism
-
O mecanismo de segurança usado para se conectar ao seu agente Kafka. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
.Valores válidos:
PLAIN
,SCRAM-SHA-512
,GSSAPI
.nota
SCRAM-SHA-512
é o único mecanismo de segurança suportado nas regiões cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west - sasl.plain.username
-
O nome de usuário usado para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
ePLAIN
parasasl.mechanism
. - sasl.plain.password
-
A senha usada para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
ePLAIN
parasasl.mechanism
. - sasl.scram.username
-
O nome de usuário usado para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eSCRAM-SHA-512
parasasl.mechanism
. - sasl.scram.password
-
A senha usada para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eSCRAM-SHA-512
parasasl.mechanism
. - sasl.kerberos.keytab
-
O arquivo keytab para autenticação Kerberos no Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
.Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a
get_secret
SQL função. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
SQL função, consulteget_secret (secretId, secretType, chave, roleArn). Use o parâmetroSecretBinary
. - sasl.kerberos.service.name
-
O nome principal do Kerberos sob o qual o Apache Kafka é executado. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
. - sasl.kerberos.krb5.kdc
-
O nome do host do centro de distribuição de chaves (KDC) ao qual seu cliente produtor do Apache Kafka se conecta. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
. - sasl.kerberos.krb5.realm
-
A região à qual seu cliente produtor Apache Kafka se conecta. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
. - sasl.kerberos.principal
-
A identidade Kerberos exclusiva à qual o Kerberos pode atribuir tickets para acessar serviços compatíveis com Kerberos. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
.
Exemplos
O JSON exemplo a seguir define uma ação do Apache Kafka em uma regra. AWS IoT O exemplo a seguir passa a função embutida sourceIp () como um modelo de substituição no cabeçalho Kafka Action.
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Notas importantes sobre a configuração do Kerberos
-
Seu centro de distribuição de chaves (KDC) deve ser resolvido por meio do Sistema de Nomes de Domínio (DNS) privado dentro do seu destino. VPC Uma abordagem possível é adicionar a KDC DNS entrada a uma zona hospedada privada. Para obter mais informações sobre essa abordagem, consulte Como trabalhar com zonas hospedadas privadas.
-
Cada um VPC deve ter a DNS resolução ativada. Para obter mais informações, consulte Usando DNS com seu VPC.
-
Os grupos de segurança da interface de rede e os grupos de segurança em nível de instância no VPC destino devem permitir o tráfego de dentro da sua VPC nas seguintes portas.
-
TCPtráfego na porta do ouvinte do bootstrap broker (geralmente 9092, mas deve estar na faixa de 9000—9100)
-
TCPe UDP tráfego na porta 88 para o KDC
-
-
SCRAM-SHA-512
é o único mecanismo de segurança suportado nas regiões cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west