As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Apache Kafka
nota
Este tópico pressupõe familiaridade com a plataforma Apache Kafka e conceitos relacionados. Para obter mais informações sobre o Apache Kafka, consulte Apache Kafka
Requisitos
Esta ação de regra tem os seguintes requisitos:
-
Uma função do IAM que AWS IoT pode ser assumida para realizar as
ec2:DescribeSecurityGroups
operaçõesec2:CreateNetworkInterface
ec2:DescribeNetworkInterfaces
ec2:CreateNetworkInterfacePermission
ec2:DeleteNetworkInterface
,ec2:DescribeSubnets
,ec2:DescribeVpcs
,ec2:DescribeVpcAttribute
,,, e. Essa função cria e gerencia interfaces de rede elásticas para o Amazon Virtual Private Cloud para alcançar seu agente Kafka. Para ter mais informações, consulte Conceder a uma AWS IoT regra o acesso que ela exige.No AWS IoT console, você pode escolher ou criar uma função para permitir AWS IoT Core a execução dessa ação de regra.
Para obter mais informações sobre interfaces de rede, consulte Interfaces de rede elásticas no Guia do usuário do Amazon EC2.
A política vinculada à função que você especificar deve ser semelhante à do exemplo a seguir.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
Se você costuma AWS Secrets Manager armazenar as credenciais necessárias para se conectar ao seu corretor Kafka, deve criar uma função do IAM que AWS IoT Core possa ser assumida para realizar as operações e.
secretsmanager:GetSecretValue
secretsmanager:DescribeSecret
A política vinculada à função que você especificar deve ser semelhante à do exemplo a seguir.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] }-
Você pode executar seus clusters do Apache Kafka dentro do Amazon Virtual Private Cloud (Amazon VPC). Você deve criar um destino Amazon VPC e usar um gateway NAT em suas sub-redes para encaminhar mensagens para um cluster público do Kafka AWS IoT . O AWS IoT mecanismo de regras cria uma interface de rede em cada uma das sub-redes listadas no destino da VPC para rotear o tráfego diretamente para a VPC. Quando você cria um destino de VPC, o mecanismo de AWS IoT regras cria automaticamente uma ação de regra de VPC. Para obter mais informações sobre ações de regras de VPC, consulte Destinos da nuvem privada virtual (VPC).
-
Se você usar uma chave gerenciada pelo cliente AWS KMS key (chave KMS) para criptografar dados em repouso, o serviço deverá ter permissão para usar a chave KMS em nome do chamador. Para obter mais informações, consulte Criptografia do Amazon MSK no Guia do desenvolvedor do Amazon Managed Streaming for Apache Kafka.
Parâmetros
Ao criar uma AWS IoT regra com essa ação, você deve especificar as seguintes informações:
- destinationArn
O nome do recurso da Amazon (ARN) do destino da VPC. Para obter informações sobre a criação do destino da VPC, consulte Destinos da nuvem privada virtual (VPC).
- tópico
O tópico do Kafka para mensagens enviadas ao agente do Kafka.
Você pode substituir esse campo usando um modelo de substituição. Para obter mais informações, consulte Modelos de substituição.
- chave (opcional)
A chave de mensagem do Kafka.
Você pode substituir esse campo usando um modelo de substituição. Para ter mais informações, consulte Modelos de substituição.
- cabeçalhos (opcional)
-
A lista de cabeçalhos Kafka que você especifica. Cada cabeçalho é um par de chave-valor que você pode especificar ao criar uma ação do Kafka. Você pode usar esses cabeçalhos para rotear dados de clientes IoT para clusters Kafka downstream sem modificar a carga útil da mensagem.
Você pode substituir esse campo usando um modelo de substituição. Para entender como passar uma função de regra integrada como um modelo de substituição no cabeçalho do Kafka Action, consulte Exemplos. Para obter mais informações, consulte Modelos de substituição.
nota
Não há compatibilidade com cabeçalhos em formato binário.
- partição (opcional)
A partição de mensagens do Kafka.
Você pode substituir esse campo usando um modelo de substituição. Para ter mais informações, consulte Modelos de substituição.
- clientProperties
Um objeto que define as propriedades do cliente produtor Apache Kafka.
- acks (opcional)
O número de confirmações que o produtor exige que o servidor tenha recebido antes de considerar uma solicitação concluída.
Se você especificar 0 como valor, o produtor não aguardará nenhuma confirmação do servidor. Se o servidor não receber a mensagem, o produtor não tentará enviá-la novamente.
Valores válidos:
-1
,0
,1
,all
. O valor padrão é1
.- bootstrap.servers
Uma lista de pares de host e porta (por exemplo,
host1:port1
,host2:port2
) usados para estabelecer a conexão inicial com o cluster Kafka.- compression.type (opcional)
O tipo de compressão para todos os dados gerados pelo produtor.
Valores válidos:
none
,gzip
,snappy
,lz4
,zstd
. O valor padrão énone
.- security.protocol
O protocolo de segurança usado para se conectar ao seu agente Kafka.
Valores válidos:
SSL
,SASL_SSL
. O valor padrão éSSL
.- key.serializer
Especifica como transformar os principais objetos que você fornece com o
ProducerRecord
em bytes.Valor válido:
StringSerializer
.- value.serializador
Especifica como transformar os principais objetos que você fornece com o
ProducerRecord
em bytes.Valor válido:
ByteBufferSerializer
.- ssl.truststore
O arquivo truststore no formato base64 ou a localização do arquivo truststore em AWS Secrets Manager. Esse valor não será necessário se o seu arquivo truststore for certificado pelas autoridades de certificação (CA) da Amazon.
Este campo oferece suporte a modelos de substituição. Se você usar o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka, poderá usar a função
get_secret
SQL para recuperar o valor desse campo. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Se o arquivo truststore estiver na forma de um arquivo, use o parâmetroSecretBinary
. Se o arquivo truststore estiver na forma de uma string, use o parâmetroSecretString
.O tamanho máximo desse valor é 65 KB.
- ssl.truststore.password
A senha do arquivo truststore. Esse valor é necessário somente se você tiver criado uma senha para o arquivo truststore.
- ssl.keystore
O arquivo de armazenamento de chaves. Esse valor é necessário quando você especifica
SSL
como valor parasecurity.protocol
.Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a
get_secret
função SQL. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Use o parâmetroSecretBinary
.- ssl.keystore.password
A senha de armazenamento do arquivo keystore. Esse valor será exigido se um valor para
ssl.keystore
for especificado.O valor desse campo pode ser texto simples. Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a
get_secret
função SQL. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Use o parâmetroSecretString
.- ssl.key.password
A senha da chave privada em seu arquivo keystore.
Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a
get_secret
função SQL. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Use o parâmetroSecretString
.- sasl.mechanism
O mecanismo de segurança usado para se conectar ao seu agente Kafka. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
.Valores válidos:
PLAIN
,SCRAM-SHA-512
,GSSAPI
.nota
SCRAM-SHA-512
é o único mecanismo de segurança suportado nas regiões cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west- sasl.plain.username
O nome de usuário usado para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
ePLAIN
parasasl.mechanism
.- sasl.plain.password
A senha usada para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
ePLAIN
parasasl.mechanism
.- sasl.scram.username
O nome de usuário usado para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eSCRAM-SHA-512
parasasl.mechanism
.- sasl.scram.password
A senha usada para recuperar a string secreta do Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eSCRAM-SHA-512
parasasl.mechanism
.- sasl.kerberos.keytab
O arquivo keytab para autenticação Kerberos no Secrets Manager. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
.Este campo oferece suporte a modelos de substituição. Use o Secrets Manager para armazenar as credenciais necessárias para se conectar ao seu agente Kafka. Para recuperar o valor desse campo, use a
get_secret
função SQL. Para obter mais informações sobre modelos de substituição, consulte Modelos de substituição. Para obter mais informações sobre aget_secret
função SQL, consulte get_secret (secretId, secretType, chave, roleArn). Use o parâmetroSecretBinary
.- sasl.kerberos.service.name
O nome principal do Kerberos sob o qual o Apache Kafka é executado. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
.- sasl.kerberos.krb5.kdc
O nome do host do centro de distribuição de chaves (KDC) ao qual seu cliente produtor do Apache Kafka se conecta. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
.- sasl.kerberos.krb5.realm
A região à qual seu cliente produtor Apache Kafka se conecta. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
.- sasl.kerberos.principal
A identidade Kerberos exclusiva à qual o Kerberos pode atribuir tickets para acessar serviços compatíveis com Kerberos. Esse valor é necessário quando você especifica
SASL_SSL
parasecurity.protocol
eGSSAPI
parasasl.mechanism
.
Exemplos
O exemplo de JSON a seguir define uma ação do Apache Kafka em uma regra. AWS IoT O exemplo a seguir passa a função integrada sourceIP() como um modelo de substituição no cabeçalho Kafka Action.
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Notas importantes sobre a configuração do Kerberos
Seu centro de distribuição de chaves (KDC) deve ser resolvido por meio de um sistema de nomes de domínio (DNS) privado em sua VPC de destino. Uma abordagem possível é adicionar a entrada DNS do KDC a uma zona hospedada privada. Para obter mais informações sobre essa abordagem, consulte Como trabalhar com zonas hospedadas privadas.
Cada VPC deve ter a resolução DNS habilitada. Para obter mais informações, consulte Como usar o DNS com sua VPC.
Os grupos de segurança da interface de rede e os grupos de segurança em nível de instância no destino da VPC devem permitir o tráfego de dentro da sua VPC nas seguintes portas.
Tráfego TCP na porta do receptor do bootstrap broker (geralmente 9092, mas deve estar na faixa de 9000—9100)
Tráfego TCP e UDP na porta 88 para o KDC
SCRAM-SHA-512
é o único mecanismo de segurança suportado nas regiões cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west