Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Apache Kafka
Nota
Questo argomento presuppone la familiarità con la piattaforma Apache Kafka e i relativi concetti. Per ulteriori informazioni su Apache Kafka, consulta Apache Kafka
Requisiti
Questa operazione della regola presenta i seguenti requisiti:
-
Un ruolo IAM che AWS IoT può assumere per eseguire le operazioni
ec2:CreateNetworkInterface
,,,ec2:DescribeNetworkInterfaces
,ec2:CreateNetworkInterfacePermission
ec2:DeleteNetworkInterface
,ec2:DescribeSubnets
e.ec2:DescribeVpcs
ec2:DescribeVpcAttribute
ec2:DescribeSecurityGroups
Questo ruolo crea e gestisce interfacce di rete elastiche per il tuo Amazon Virtual Private Cloud per raggiungere il tuo broker Kafka. Per ulteriori informazioni, consulta Concedere a qualsiasi AWS IoT regola l'accesso richiesto.Nella AWS IoT console, puoi scegliere o creare un ruolo che consenta di AWS IoT Core eseguire questa azione basata sulla regola.
Per maggiori informazioni sulle interfacce di rete, consulta Interfacce di rete elastiche nella Guida per l'utente di Amazon EC2.
La policy associata al ruolo che specifichi sarà simile a quella del seguente esempio.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
Se utilizzi AWS Secrets Manager per memorizzare le credenziali necessarie per connetterti al tuo broker Kafka, devi creare un ruolo IAM che AWS IoT Core possa assumersi di eseguire le
secretsmanager:GetSecretValue
operazioni and.secretsmanager:DescribeSecret
La policy associata al ruolo che specifichi sarà simile a quella del seguente esempio.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] }-
Puoi eseguire i cluster Apache Kafka all'interno di Amazon Virtual Private Cloud (Amazon VPC). È necessario creare una destinazione Amazon VPC e utilizzare un gateway NAT nelle sottoreti per inoltrare messaggi da AWS IoT un cluster Kafka pubblico. Le regole AWS IoT creano un'interfaccia di rete in ciascuna delle sottoreti elencate nella destinazione VPC per indirizzare il traffico direttamente al VPC. Quando crei una destinazione VPC, il motore delle AWS IoT regole crea automaticamente un'azione della regola VPC. Per ulteriori informazioni sulle operazioni delle regole VPC, consulta Destinazioni Virtual private cloud (VPC).
-
Se si utilizza una chiave gestita dal cliente AWS KMS key (chiave KMS) per crittografare i dati inattivi, il servizio deve disporre dell'autorizzazione a utilizzare la chiave KMS per conto del chiamante. Per ulteriori informazioni, consulta crittografia di Amazon MSK nella Guida per sviluppatori Amazon Managed Streaming for Apache Kafka.
Parametri
Quando crei una AWS IoT regola con questa azione, devi specificare le seguenti informazioni:
- destinationArn
L'Amazon Resource Name (ARN) della destinazione VPC. Per ulteriori informazioni sulla creazione di una destinazione VPC, consulta Destinazioni Virtual private cloud (VPC).
- topic
L'argomento Kafka per i messaggi da inviare al broker Kafka.
È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.
- chiave (opzionale)
La chiave del messaggio Kafka.
È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.
- intestazioni (opzionali)
-
L'elenco delle intestazioni specificate. Ogni intestazione è una coppia chiave-valore che puoi specificare quando crei un'operazione Kafka. Puoi utilizzare queste intestazioni per instradare i dati dai client IoT ai cluster Kafka a valle senza modificare il payload dei messaggi.
È possibile sostituire questo campo utilizzando un modello di sostituzione. Per capire come passare la funzione di una regola in linea come modello sostitutivo nell'intestazione dell'operazione Kafka, consulta Esempi. Per ulteriori informazioni, consulta Modelli di sostituzione.
Nota
Le intestazioni in formato binario non sono supportate.
- partizione (opzionale)
La partizione del messaggio Kafka.
È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.
- clientProperties
Un oggetto che definisce le proprietà del client del produttore Apache Kafka.
- conferme (facoltativo)
Il numero di conferme che, secondo il produttore, il server deve aver ricevuto, prima di considerare una richiesta come completa.
Se specifichi 0 come valore, il produttore non attenderà alcuna conferma da parte del server. Se il server non riceve il messaggio, il produttore non tenterà di inviare il messaggio un'altra volta.
Valori validi:
-1
,0
,1
,all
. Il valore predefinito è1
.- bootstrap.servers
Un elenco di coppie host e porta (ad esempio
host1:port1
,host2:port2
) utilizzato per stabilire la connessione iniziale al cluster Kafka.- compression.type (opzionale)
Il tipo di compressione per tutti i dati generati dal produttore.
Valori validi:
none
,gzip
,snappy
,lz4
,zstd
. Il valore predefinito ènone
.- security.protocol
Il protocollo di sicurezza usato per collegarsi al broker Kafka.
Valori validi:
SSL
,SASL_SSL
. Il valore predefinito èSSL
.- key.serializer
Specifica come trasformare gli oggetti chiave che fornisci con il
ProducerRecord
in byte.Valore valido:
StringSerializer
.- value.serializer
Specifica come trasformare gli oggetti valore che fornisci con il
ProducerRecord
in byte.Valore valido:
ByteBufferSerializer
.- ssl.truststore
Il file truststore in formato base64 o la posizione del file truststore in AWS Secrets Manager. Questo valore non è richiesto se il tuo truststore è considerato attendibile dalle autorità di certificazione Amazon (CA).
Questo campo supporta i modelli di sostituzione. Se utilizzi Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka, puoi utilizzare la funzione SQL
get_secret
per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQLget_secret
, consulta get_secret (secretId, secretType, chiave, roleArn). Se il truststore ha la forma di un file, usa il parametroSecretBinary
. Se il truststore è sotto forma di una stringa, usa il parametroSecretString
.La dimensione massima di questo valore è di 65 KB.
- ssl.truststore.password
La password del truststore. Questo valore è richiesto solo se è stata creata una password per il truststore.
- ssl.keystore
Il file keystore. Questo valore è obbligatorio quando si specifica
SSL
come valore persecurity.protocol
.Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Utilizza la funzione SQL
get_secret
per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQLget_secret
, consulta get_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametroSecretBinary
.- ssl.keystore.password
La password dell'archivio per il file keystore. Questo valore è obbligatorio se viene specificato un valore per
ssl.keystore
.Il valore di questo campo può essere un testo semplice. Questo campo supporta anche i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Utilizza la funzione SQL
get_secret
per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQLget_secret
, consulta get_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametroSecretString
.- ssl.key.password
La password della chiave privata nel file keystore.
Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Utilizza la funzione SQL
get_secret
per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQLget_secret
, consulta get_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametroSecretString
.- sasl.mechanism
Il meccanismo di sicurezza utilizzato per connettersi al broker Kafka. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
.Valori validi:
PLAIN
,SCRAM-SHA-512
,GSSAPI
.Nota
SCRAM-SHA-512
è l'unico meccanismo di sicurezza supportato nelle regioni cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west- sasl.plain.username
Il nome utente utilizzato per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
ePLAIN
persasl.mechanism
.- sasl.plain.password
La password utilizzata per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
ePLAIN
persasl.mechanism
.- sasl.scram.username
Il nome utente utilizzato per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eSCRAM-SHA-512
persasl.mechanism
.- sasl.scram.password
La password utilizzata per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eSCRAM-SHA-512
persasl.mechanism
.- sasl.kerberos.keytab
Il file keytab per l'autenticazione di Kerberos in Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Utilizza la funzione SQL
get_secret
per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione SQLget_secret
, consulta get_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametroSecretBinary
.- sasl.kerberos.service.name
Il nome principale Kerberos con il quale Apache Kafka funziona. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.- sasl.kerberos.krb5.kdc
Il nome host del centro di distribuzione delle chiavi (KDC) a cui si connette il client produttore Apache Kafka. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.- sasl.kerberos.krb5.realm
Il dominio a cui si connette il client del produttore Apache Kafka. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.- sasl.kerberos.principal
L'identità Kerberos univoca a cui Kerberos può assegnare ticket per accedere ai servizi compatibili con Kerberos. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.
Esempi
Il seguente esempio JSON definisce un'azione di Apache Kafka in una regola. AWS IoT L'esempio seguente passa la funzione in linea sourceIp() come modello di sostituzione nell'intestazione dell'operazione Kafka.
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Note importanti sulla configurazione di Kerberos
Il centro di distribuzione delle chiavi (KDC) deve essere risolvibile tramite Domain Name System (DNS) privato all'interno del VPC di destinazione. Un possibile approccio è quello di aggiungere la voce DNS del KDC a una zona ospitata privata. Per ulteriori informazioni su questo approccio, consulta Utilizzo delle zone ospitate private.
Ogni VPC deve avere la risoluzione DNS abilitata. Per ulteriori informazioni, consulta Utilizzo del DNS con il tuo VPC.
I gruppi di sicurezza dell'interfaccia di rete e i gruppi di sicurezza a livello di istanza nella destinazione VPC devono consentire il traffico dall'interno del VPC sulle seguenti porte.
Traffico TCP sulla porta listener del broker bootstrap (spesso 9092, ma deve essere compreso nell'intervallo 9000-9100)
Traffico TCP e UDP sulla porta 88 per il KDC
SCRAM-SHA-512
è l'unico meccanismo di sicurezza supportato nelle regioni cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west