Apache Kafka - AWS IoT Core

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Apache Kafka

L'azione Apache Kafka (Kafka) invia messaggi direttamente ai tuoi Amazon Managed Streaming for Apache Kafka (MSKAmazon), ai cluster Apache Kafka gestiti da provider di terze parti come Confluent Cloud o ai cluster Apache Kafka autogestiti. Con Kafka rule action, puoi indirizzare i tuoi dati IoT ai cluster Kafka. Ciò consente di creare pipeline di dati ad alte prestazioni per vari scopi, come analisi di streaming, integrazione dei dati, visualizzazione e applicazioni aziendali cruciali.

Nota

Questo argomento presuppone la familiarità con la piattaforma Apache Kafka e i relativi concetti. Per ulteriori informazioni su Apache Kafka, consulta Apache Kafka. MSK La modalità serverless non è supportata. MSK I cluster serverless possono essere eseguiti solo tramite IAM l'autenticazione, che attualmente l'azione delle regole di Apache Kafka non supporta. Per ulteriori informazioni su come configurare AWS IoT Core con Confluent, consulta Sfruttare Confluent e AWS risolvere le sfide della gestione dei dati e dei dispositivi IoT.

Requisiti

Questa operazione della regola presenta i seguenti requisiti:

  • Un IAM ruolo che AWS IoT può assumere per eseguire leec2:CreateNetworkInterface,,,, ec2:DescribeNetworkInterfaces ec2:CreateNetworkInterfacePermission ec2:DeleteNetworkInterfaceec2:DescribeSubnets, ec2:DescribeVpcs e le operazioni. ec2:DescribeVpcAttribute ec2:DescribeSecurityGroups Questo ruolo crea e gestisce interfacce di rete elastiche per il tuo Amazon Virtual Private Cloud per raggiungere il tuo broker Kafka. Per ulteriori informazioni, consulta Concedere a qualsiasi AWS IoT regola l'accesso richiesto.

    Nella AWS IoT console, è possibile scegliere o creare un ruolo per consentire l'esecuzione di questa azione relativa AWS IoT Core alla regola.

    Per ulteriori informazioni sulle interfacce di rete, consulta Interfacce di rete elastiche nella Amazon EC2 User Guide.

    La policy associata al ruolo che specifichi sarà simile a quella del seguente esempio.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Se utilizzi AWS Secrets Manager per memorizzare le credenziali necessarie per connetterti al tuo broker Kafka, devi creare un IAM ruolo che AWS IoT Core possa assumere per eseguire le operazioni and. secretsmanager:GetSecretValue secretsmanager:DescribeSecret

    La policy associata al ruolo che specifichi sarà simile a quella del seguente esempio.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • Puoi eseguire i tuoi cluster Apache Kafka all'interno di Amazon Virtual Private Cloud (Amazon). VPC È necessario creare una VPC destinazione Amazon e utilizzare un NAT gateway nelle sottoreti per inoltrare messaggi da un cluster AWS IoT Kafka pubblico. Il motore AWS IoT delle regole crea un'interfaccia di rete in ciascuna delle sottoreti elencate nella VPC destinazione per indirizzare il traffico direttamente verso. VPC Quando si crea una VPC destinazione, il motore delle AWS IoT regole crea automaticamente un'azione di VPC regola. Per ulteriori informazioni sulle azioni delle VPC regole, vedereVPCDestinazioni nel cloud privato virtuale ().

  • Se si utilizza una KMS chiave gestita AWS KMS key dal cliente per crittografare i dati inattivi, il servizio deve disporre dell'autorizzazione a utilizzare la KMS chiave per conto del chiamante. Per ulteriori informazioni, consulta Amazon MSK encryption nella Amazon Managed Streaming for Apache Kafka Developer Guide.

Parametri

Quando crei una AWS IoT regola con questa azione, devi specificare le seguenti informazioni:

destinationArn

L'Amazon Resource Name (ARN) della VPC destinazione. Per informazioni sulla creazione di una VPC destinazione, consultaVPCDestinazioni nel cloud privato virtuale ().

topic

L'argomento Kafka per i messaggi da inviare al broker Kafka.

È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.

chiave (opzionale)

La chiave del messaggio Kafka.

È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.

intestazioni (opzionali)

L'elenco delle intestazioni specificate. Ogni intestazione è una coppia chiave-valore che puoi specificare quando crei un'operazione Kafka. Puoi utilizzare queste intestazioni per instradare i dati dai client IoT ai cluster Kafka a valle senza modificare il payload dei messaggi.

È possibile sostituire questo campo utilizzando un modello di sostituzione. Per capire come passare la funzione di una regola in linea come modello sostitutivo nell'intestazione dell'operazione Kafka, consulta Esempi. Per ulteriori informazioni, consulta Modelli di sostituzione.

Nota

Le intestazioni in formato binario non sono supportate.

partizione (opzionale)

La partizione del messaggio Kafka.

È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.

clientProperties

Un oggetto che definisce le proprietà del client del produttore Apache Kafka.

conferme (facoltativo)

Il numero di conferme che, secondo il produttore, il server deve aver ricevuto, prima di considerare una richiesta come completa.

Se specifichi 0 come valore, il produttore non attenderà alcuna conferma da parte del server. Se il server non riceve il messaggio, il produttore non tenterà di inviare il messaggio un'altra volta.

Valori validi: -1, 0, 1, all. Il valore predefinito è 1.

bootstrap.servers

Un elenco di coppie host e porta (ad esempio host1:port1, host2:port2) utilizzato per stabilire la connessione iniziale al cluster Kafka.

compression.type (opzionale)

Il tipo di compressione per tutti i dati generati dal produttore.

Valori validi: none, gzip, snappy, lz4, zstd. Il valore predefinito è none.

security.protocol

Il protocollo di sicurezza usato per collegarsi al broker Kafka.

Valori validi: SSL, SASL_SSL. Il valore predefinito è SSL.

key.serializer

Specifica come trasformare gli oggetti chiave che fornisci con il ProducerRecord in byte.

Valore valido: StringSerializer.

value.serializer

Specifica come trasformare gli oggetti valore che fornisci con il ProducerRecord in byte.

Valore valido: ByteBufferSerializer.

ssl.truststore

Il file truststore in formato base64 o la posizione del file truststore in AWS Secrets Manager. Questo valore non è richiesto se il tuo truststore è considerato attendibile dalle autorità di certificazione Amazon (CA).

Questo campo supporta i modelli di sostituzione. Se utilizzi Secrets Manager per memorizzare le credenziali necessarie per connetterti al tuo broker Kafka, puoi utilizzare la get_secret SQL funzione per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione, vedere. get_secret SQL get_secret (secretId,, keysecretType,) roleArn Se il truststore ha la forma di un file, usa il parametro SecretBinary. Se il truststore è sotto forma di una stringa, usa il parametro SecretString.

La dimensione massima di questo valore è di 65 KB.

ssl.truststore.password

La password del truststore. Questo valore è richiesto solo se è stata creata una password per il truststore.

ssl.keystore

Il file keystore. Questo valore è obbligatorio quando si specifica SSL come valore per security.protocol.

Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Per recuperare il valore di questo campo, utilizzare la get_secret SQL funzione. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla get_secret SQL funzione, vedereget_secret (secretId,, keysecretType,) roleArn. Utilizzo del parametro SecretBinary.

ssl.keystore.password

La password dell'archivio per il file keystore. Questo valore è obbligatorio se viene specificato un valore per ssl.keystore.

Il valore di questo campo può essere un testo semplice. Questo campo supporta anche i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Per recuperare il valore di questo campo, utilizzare la get_secret SQL funzione. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla get_secret SQL funzione, vedereget_secret (secretId,, keysecretType,) roleArn. Utilizzo del parametro SecretString.

ssl.key.password

La password della chiave privata nel file keystore.

Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Per recuperare il valore di questo campo, utilizzare la get_secret SQL funzione. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla get_secret SQL funzione, vedereget_secret (secretId,, keysecretType,) roleArn. Utilizzo del parametro SecretString.

sasl.mechanism

Il meccanismo di sicurezza utilizzato per connettersi al broker Kafka. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol.

Valori validi: PLAIN, SCRAM-SHA-512, GSSAPI.

Nota

SCRAM-SHA-512è l'unico meccanismo di sicurezza supportato nelle regioni cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west

sasl.plain.username

Il nome utente utilizzato per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e PLAIN per sasl.mechanism.

sasl.plain.password

La password utilizzata per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e PLAIN per sasl.mechanism.

sasl.scram.username

Il nome utente utilizzato per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e SCRAM-SHA-512 per sasl.mechanism.

sasl.scram.password

La password utilizzata per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e SCRAM-SHA-512 per sasl.mechanism.

sasl.kerberos.keytab

Il file keytab per l'autenticazione di Kerberos in Secrets Manager. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Per recuperare il valore di questo campo, usa la funzione. get_secret SQL Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla get_secret SQL funzione, vedereget_secret (secretId,, keysecretType,) roleArn. Utilizzo del parametro SecretBinary.

sasl.kerberos.service.name

Il nome principale Kerberos con il quale Apache Kafka funziona. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

sasl.kerberos.krb5.kdc

Il nome host del centro di distribuzione delle chiavi (KDC) a cui si connette il client di produzione di Apache Kafka. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

sasl.kerberos.krb5.realm

Il dominio a cui si connette il client del produttore Apache Kafka. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

sasl.kerberos.principal

L'identità Kerberos univoca a cui Kerberos può assegnare ticket per accedere ai servizi compatibili con Kerberos. Questo valore è obbligatorio quando si specifica SASL_SSL per security.protocol e GSSAPI per sasl.mechanism.

Esempi

L'JSONesempio seguente definisce un'azione di Apache Kafka in una regola. AWS IoT L'esempio seguente passa la funzione inline sourceIp() come modello sostitutivo nell'intestazione Kafka Action.

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

Note importanti sulla configurazione di Kerberos

  • Il centro di distribuzione delle chiavi (KDC) deve essere risolvibile tramite Domain Name System () privato all'interno del target. DNS VPC Un approccio possibile consiste nell'aggiungere la KDC DNS voce a una zona ospitata privata. Per ulteriori informazioni su questo approccio, consulta Utilizzo delle zone ospitate private.

  • Ciascuno VPC deve avere DNS la risoluzione abilitata. Per ulteriori informazioni, vedi Utilizzo DNS con il tuo VPC.

  • I gruppi di sicurezza dell'interfaccia di rete e i gruppi di sicurezza a livello di istanza nella VPC destinazione devono consentire il traffico dall'interno dell'utente VPC sulle seguenti porte.

    • TCPtraffico sulla porta del listener del broker bootstrap (spesso 9092, ma deve essere compreso nell'intervallo 9000-9100)

    • TCPe traffico sulla porta 88 per UDP KDC

  • SCRAM-SHA-512è l'unico meccanismo di sicurezza supportato nelle regioni cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west