Apache Kafka - AWS IoT Core

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Apache Kafka

L'azione Apache Kafka (Kafka) invia messaggi direttamente ai tuoi cluster Amazon Managed Streaming for Apache Kafka (Amazon MSK), ai cluster Apache Kafka gestiti da provider di terze parti come Confluent Cloud o ai cluster Apache Kafka autogestiti per l'analisi e la visualizzazione dei dati.

Nota

Questo argomento presuppone la familiarità con la piattaforma Apache Kafka e i relativi concetti. Per ulteriori informazioni su Apache Kafka, consulta Apache Kafka. MSK Serverless non è supportato. I cluster MSK Serverless possono essere eseguiti solo tramite l'autenticazione IAM, che l'azione delle regole di Apache Kafka attualmente non supporta.

Requisiti

Questa operazione della regola presenta i seguenti requisiti:

  • Un ruolo IAM che AWS IoT può assumere per eseguire le operazioniec2:CreateNetworkInterface,,,ec2:DescribeNetworkInterfaces, ec2:CreateNetworkInterfacePermissionec2:DeleteNetworkInterface, ec2:DescribeSubnets e. ec2:DescribeVpcs ec2:DescribeVpcAttribute ec2:DescribeSecurityGroups Questo ruolo crea e gestisce interfacce di rete elastiche per il tuo Amazon Virtual Private Cloud per raggiungere il tuo broker Kafka. Per ulteriori informazioni, consulta Concedere a qualsiasi AWS IoT regola l'accesso richiesto.

    Nella AWS IoT console, puoi scegliere o creare un ruolo che consenta di AWS IoT Core eseguire questa azione basata sulla regola.

    Per maggiori informazioni sulle interfacce di rete, consulta Interfacce di rete elastiche nella Guida per l'utente di Amazon EC2.

    La policy associata al ruolo che specifichi sarà simile a quella del seguente esempio.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Se utilizzi AWS Secrets Manager per memorizzare le credenziali necessarie per connetterti al tuo broker Kafka, devi creare un ruolo IAM che AWS IoT Core possa assumersi di eseguire le secretsmanager:GetSecretValue operazioni and. secretsmanager:DescribeSecret

    La policy associata al ruolo che specifichi sarà simile a quella del seguente esempio.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • Puoi eseguire i cluster Apache Kafka all'interno di Amazon Virtual Private Cloud (Amazon VPC). È necessario creare una destinazione Amazon VPC e utilizzare un gateway NAT nelle sottoreti per inoltrare messaggi da AWS IoT un cluster Kafka pubblico. Le regole AWS IoT creano un'interfaccia di rete in ciascuna delle sottoreti elencate nella destinazione VPC per indirizzare il traffico direttamente al VPC. Quando crei una destinazione VPC, il motore delle AWS IoT regole crea automaticamente un'azione della regola VPC. Per ulteriori informazioni sulle operazioni delle regole VPC, consulta Destinazioni Virtual private cloud (VPC).

  • Se si utilizza una chiave gestita dal cliente AWS KMS key (chiave KMS) per crittografare i dati inattivi, il servizio deve disporre dell'autorizzazione a utilizzare la chiave KMS per conto del chiamante. Per ulteriori informazioni, consulta crittografia di Amazon MSK nella Guida per sviluppatori Amazon Managed Streaming for Apache Kafka.

Parametri

Quando crei una AWS IoT regola con questa azione, devi specificare le seguenti informazioni:

destinationArn

L'Amazon Resource Name (ARN) della destinazione VPC. Per ulteriori informazioni sulla creazione di una destinazione VPC, consulta Destinazioni Virtual private cloud (VPC).

topic

L'argomento Kafka per i messaggi da inviare al broker Kafka.

È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.

chiave (opzionale)

La chiave del messaggio Kafka.

È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.

intestazioni (opzionali)

L'elenco delle intestazioni specificate. Ogni intestazione è una coppia chiave-valore che puoi specificare quando crei un'operazione Kafka. Puoi utilizzare queste intestazioni per instradare i dati dai client IoT ai cluster Kafka a valle senza modificare il payload dei messaggi.

È possibile sostituire questo campo utilizzando un modello di sostituzione. Per capire come passare la funzione di una regola in linea come modello sostitutivo nell'intestazione dell'operazione Kafka, consulta Esempi. Per ulteriori informazioni, consulta Modelli di sostituzione.

Nota

Le intestazioni in formato binario non sono supportate.

partizione (opzionale)

La partizione del messaggio Kafka.

È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.

clientProperties

Un oggetto che definisce le proprietà del client del produttore Apache Kafka.

conferme (facoltativo)

Il numero di conferme che, secondo il produttore, il server deve aver ricevuto, prima di considerare una richiesta come completa.

Se specifichi 0 come valore, il produttore non attenderà alcuna conferma da parte del server. Se il server non riceve il messaggio, il produttore non tenterà di inviare il messaggio un'altra volta.

Valori validi: -1, 0, 1, all. Il valore predefinito è 1.

bootstrap.servers

Un elenco di coppie host e porta (ad esempio host1:port1, host2:port2) utilizzato per stabilire la connessione iniziale al cluster Kafka.

compression.type (opzionale)

Il tipo di compressione per tutti i dati generati dal produttore.

Valori validi: none, gzip, snappy, lz4, zstd. Il valore predefinito è none.

security.protocol

Il protocollo di sicurezza usato per collegarsi al broker Kafka.

Valori validi: SSL, SASL_SSL. Il valore predefinito è SSL.

key.serializer

Specifica come trasformare gli oggetti chiave che fornisci con il ProducerRecord in byte.

Valore valido: StringSerializer.

value.serializer

Specifica come trasformare gli oggetti valore che fornisci con il ProducerRecord in byte.

Valore valido: ByteBufferSerializer.

ssl.truststore

Il file truststore in formato base64 o la posizione del file truststore in AWS Secrets Manager. Questo valore non è richiesto se il tuo truststore è considerato attendibile dalle autorità di certificazione Amazon (CA).

Questo campo supporta i modelli di sostituzione. Se utilizzi Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka, puoi utilizzare la funzione SQL get_secret per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQL get_secret, consulta get_secret (secretId, secretType, chiave, roleArn). Se il truststore ha la forma di un file, usa il parametro SecretBinary. Se il truststore è sotto forma di una stringa, usa il parametro SecretString.

La dimensione massima di questo valore è di 65 KB.

ssl.truststore.password

La password del truststore. Questo valore è richiesto solo se è stata creata una password per il truststore.

ssl.keystore

Il file keystore. Questo valore è obbligatorio quando si specifica SSL come valore per security.protocol.

Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Utilizza la funzione SQL get_secret per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQL get_secret, consulta get_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametro SecretBinary.

ssl.keystore.password

La password dell'archivio per il file keystore. Questo valore è obbligatorio se viene specificato un valore per ssl.keystore.

Il valore di questo campo può essere un testo semplice. Questo campo supporta anche i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Utilizza la funzione SQL get_secret per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQL get_secret, consulta get_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametro SecretString.

ssl.key.password

La password della chiave privata nel file keystore.

Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Utilizza la funzione SQL get_secret per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQL get_secret, consulta get_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametro SecretString.

sasl.mechanism

Il meccanismo di sicurezza utilizzato per connettersi al broker Kafka. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol.

Valori validi: PLAIN, SCRAM-SHA-512, GSSAPI.

Nota

SCRAM-SHA-512è l'unico meccanismo di sicurezza supportato nelle regioni cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west

sasl.plain.username

Il nome utente utilizzato per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e PLAIN per sasl.mechanism.

sasl.plain.password

La password utilizzata per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e PLAIN per sasl.mechanism.

sasl.scram.username

Il nome utente utilizzato per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e SCRAM-SHA-512 per sasl.mechanism.

sasl.scram.password

La password utilizzata per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e SCRAM-SHA-512 per sasl.mechanism.

sasl.kerberos.keytab

Il file keytab per l'autenticazione di Kerberos in Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Utilizza la funzione SQL get_secret per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQL get_secret, consulta get_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametro SecretBinary.

sasl.kerberos.service.name

Il nome principale Kerberos con il quale Apache Kafka funziona. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

sasl.kerberos.krb5.kdc

Il nome host del centro di distribuzione delle chiavi (KDC) a cui si connette il client produttore Apache Kafka. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

sasl.kerberos.krb5.realm

Il dominio a cui si connette il client del produttore Apache Kafka. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

sasl.kerberos.principal

L'identità Kerberos univoca a cui Kerberos può assegnare ticket per accedere ai servizi compatibili con Kerberos. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

Esempi

Il seguente esempio JSON definisce un'azione di Apache Kafka in una regola. AWS IoT L'esempio seguente passa la funzione in linea sourceIp() come modello di sostituzione nell'intestazione dell'operazione Kafka.

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

Note importanti sulla configurazione di Kerberos

  • Il centro di distribuzione delle chiavi (KDC) deve essere risolvibile tramite Domain Name System (DNS) privato all'interno del VPC di destinazione. Un possibile approccio è quello di aggiungere la voce DNS del KDC a una zona ospitata privata. Per ulteriori informazioni su questo approccio, consulta Utilizzo delle zone ospitate private.

  • Ogni VPC deve avere la risoluzione DNS abilitata. Per ulteriori informazioni, consulta Utilizzo del DNS con il tuo VPC.

  • I gruppi di sicurezza dell'interfaccia di rete e i gruppi di sicurezza a livello di istanza nella destinazione VPC devono consentire il traffico dall'interno del VPC sulle seguenti porte.

    • Traffico TCP sulla porta listener del broker bootstrap (spesso 9092, ma deve essere compreso nell'intervallo 9000-9100)

    • Traffico TCP e UDP sulla porta 88 per il KDC

  • SCRAM-SHA-512è l'unico meccanismo di sicurezza supportato nelle regioni cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west