Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Apache Kafka
La acción Apache Kafka (Kafka) envía los mensajes directamente a Amazon Managed Streaming for Apache Kafka (MSKAmazon), a los clústeres de Apache Kafka gestionados por proveedores externos, como
nota
En este tema se presupone estar familiarizado con la plataforma Apache Kafka y los conceptos relacionados. Para obtener más información sobre Apache Kafka, consulte Apache Kafka.
Requisitos
Esta regla tiene los siguientes requisitos:
-
Un IAM rol que AWS IoT puede asumir para realizar las
ec2:CreateNetworkInterface
operacionesec2:DescribeNetworkInterfaces
,ec2:CreateNetworkInterfacePermission
,ec2:DeleteNetworkInterface
,ec2:DescribeSubnets
ec2:DescribeVpcs
ec2:DescribeVpcAttribute
, y.ec2:DescribeSecurityGroups
Este rol crea y administra interfaces de red elásticas para su Amazon Virtual Private Cloud para comunicarse con su agente de Kafka. Para obtener más información, consulte Otorgar a una AWS IoT regla el acceso que requiere.En la AWS IoT consola, puede elegir o crear un rol que permita AWS IoT Core realizar esta acción de regla.
Para obtener más información sobre las interfaces de red, consulte Interfaces de red elásticas en la Guía del EC2 usuario de Amazon.
La política asociada al rol especificado debería verse de manera similar al siguiente ejemplo.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
-
Si las utiliza AWS Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka, debe crear una IAM función que AWS IoT Core pueda asumir para realizar las
secretsmanager:DescribeSecret
operacionessecretsmanager:GetSecretValue
y.La política asociada al rol especificado debería verse de manera similar al siguiente ejemplo.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] } -
Puede ejecutar sus clústeres de Apache Kafka dentro de Amazon Virtual Private Cloud (AmazonVPC). Debe crear un VPC destino de Amazon y utilizar una NAT puerta de enlace en sus subredes para reenviar los mensajes desde AWS IoT un clúster público de Kafka. El motor de AWS IoT reglas crea una interfaz de red en cada una de las subredes enumeradas en el VPC destino para enrutar el tráfico directamente a. VPC Al crear un VPC destino, el motor de AWS IoT reglas crea automáticamente una acción de VPC regla. Para obtener más información sobre las acciones de las VPC reglas, consulteVPCDestinos de nube privada virtual ().
-
Si utiliza una AWS KMS key (KMSclave) gestionada por el cliente para cifrar los datos en reposo, el servicio debe tener permiso para utilizar la KMS clave en nombre de la persona que llama. Para obtener más información, consulte el MSKcifrado de Amazon en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.
Parámetros
Al crear una AWS IoT regla con esta acción, debe especificar la siguiente información:
- destinationArn
-
El nombre del recurso de Amazon (ARN) del VPC destino. Para obtener información sobre la creación de un VPC destino, consulteVPCDestinos de nube privada virtual ().
- tema
-
Tema de Kafka para que los mensajes se envíen al agente de Kafka.
Puede sustituir este campo mediante una plantilla de sustitución. Para obtener más información, consulte Plantillas de sustitución.
- clave (opcional)
-
Clave de mensajes de Kafka.
Puede sustituir este campo mediante una plantilla de sustitución. Para obtener más información, consulte Plantillas de sustitución.
- encabezados (opcional)
-
La lista de cabeceras de Kafka que usted especifique. Cada encabezado es un par clave-valor que puede especificar al crear una acción de Kafka. Puede usar estos encabezados para enrutar los datos de los clientes de IoT a los clústeres de Kafka descendentes sin modificar la carga útil de los mensajes.
Puede sustituir este campo mediante una plantilla de sustitución. Para saber cómo pasar una función de regla en línea como plantilla de sustitución en el encabezado de la acción Kafka, consulte los ejemplos. Para obtener más información, consulte Plantillas de sustitución.
nota
Los encabezados en formato binario no son compatibles.
- partición (opcional)
-
Partición de mensajes de Kafka.
Puede sustituir este campo mediante una plantilla de sustitución. Para obtener más información, consulte Plantillas de sustitución.
- clientProperties
-
Un objeto que define las propiedades del cliente productor de Apache Kafka.
- acks (opcional)
-
El número de reconocimientos que el productor requiere que el servidor haya recibido antes de considerar completa una solicitud.
Si especifica 0 como valor, el productor no esperará ningún acuse de recibo del servidor. Si el servidor no recibe el mensaje, el productor no volverá a intentar enviarlo.
Valores válidos:
-1
,0
,1
,all
. El valor predeterminado es1
. - bootstrap.servers
-
Una lista de pares de host y puerto (por ejemplo
host1:port1
,host2:port2
) que se utilizan para establecer la conexión inicial con el clúster de Kafka. - compression.type (opcional)
-
El tipo de compresión de todos los datos generados por el productor.
Valores válidos:
none
,gzip
,snappy
,lz4
,zstd
. El valor predeterminado esnone
. - security.protocol
-
El protocolo de seguridad utilizado para conectarse a su agente de Kafka.
Valores válidos:
SSL
,SASL_SSL
. El valor predeterminado esSSL
. - key.serializer
-
Especifica cómo convertir los objetos clave que usted proporciona con el
ProducerRecord
en bytes.Valor válido:
StringSerializer
. - value.serializer
-
Especifica cómo convertir los objetos de valor que proporcione con el
ProducerRecord
en bytes.Valor válido:
ByteBufferSerializer
. - ssl.truststore
-
El archivo truststore en formato base64 o la ubicación del archivo truststore en AWS Secrets Manager Este valor no es necesario si su almacén de confianza cuenta con la confianza de las autoridades de certificación (CA) de Amazon.
Este campo admite plantillas de sustitución. Si usa Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka, puede usar la
get_secret
SQL función para recuperar el valor de este campo. Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre laget_secret
SQL función, consulteget_secret(secretId, secretType, key, roleArn). Si el almacén de confianza tiene la forma de un archivo, utilice el parámetroSecretBinary
. Si el almacén de confianza tiene la forma de una cadena, utilice el parámetroSecretString
.El valor máximo de este recuento es 65 KB.
- ssl.truststore.password
-
La contraseña del almacén de confianza. Este valor solo es obligatorio si ha creado una contraseña para el almacén de confianza.
- ssl.keystore
-
El archivo del almacén de claves. Este valor es obligatorio cuando se especifica
SSL
como valor parasecurity.protocol
.Este campo admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la
get_secret
SQL función. Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre laget_secret
SQL función, consulteget_secret(secretId, secretType, key, roleArn). Utilice el parámetroSecretBinary
. - ssl.keystore.password
-
La contraseña del almacén para el archivo keystore. Este valor es necesario si especifica un valor para
ssl.keystore
.El valor de este campo puede ser texto sin formato. Este campo también admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la
get_secret
SQL función. Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre laget_secret
SQL función, consulteget_secret(secretId, secretType, key, roleArn). Utilice el parámetroSecretString
. - ssl.key.password
-
La contraseña de la clave privada del archivo del almacén de claves.
Este campo admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la
get_secret
SQL función. Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre laget_secret
SQL función, consulteget_secret(secretId, secretType, key, roleArn). Utilice el parámetroSecretString
. - sasl.mechanism
-
El mecanismo de seguridad utilizado para conectarse a su agente de Kafka. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
.Valores válidos:
PLAIN
,SCRAM-SHA-512
,GSSAPI
.nota
SCRAM-SHA-512
es el único mecanismo de seguridad compatible en las regiones cn-north-1, cn-northwest-1, -1 y -1. us-gov-east us-gov-west - sasl.plain.username
-
El nombre de usuario utilizado para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
yPLAIN
parasasl.mechanism
. - sasl.plain.password
-
La contraseña utilizada para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
yPLAIN
parasasl.mechanism
. - sasl.scram.username
-
El nombre de usuario utilizado para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
ySCRAM-SHA-512
parasasl.mechanism
. - sasl.scram.password
-
La contraseña utilizada para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
ySCRAM-SHA-512
parasasl.mechanism
. - sasl.kerberos.keytab
-
El archivo keytab para la autenticación de Kerberos en Secrets Manager. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
yGSSAPI
parasasl.mechanism
.Este campo admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la función.
get_secret
SQL Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre laget_secret
SQL función, consulteget_secret(secretId, secretType, key, roleArn). Utilice el parámetroSecretBinary
. - sasl.kerberos.service.name
-
El nombre principal de Kerberos con el que se ejecuta Apache Kafka. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
yGSSAPI
parasasl.mechanism
. - sasl.kerberos.krb5.kdc
-
El nombre de host del centro de distribución de claves (KDC) al que se conecta su cliente productor de Apache Kafka. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
yGSSAPI
parasasl.mechanism
. - sasl.kerberos.krb5.realm
-
El ámbito al que se conecta su cliente productor de Apache Kafka. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
yGSSAPI
parasasl.mechanism
. - sasl.kerberos.principal
-
La identidad única de Kerberos a la que Kerberos puede asignar tickets para acceder a los servicios compatibles con Kerberos. Este valor es obligatorio cuando se especifica
SASL_SSL
parasecurity.protocol
yGSSAPI
parasasl.mechanism
.
Ejemplos
El siguiente JSON ejemplo define una acción de Apache Kafka en una regla. AWS IoT El siguiente ejemplo pasa la función en línea sourceIp() como plantilla de sustitución en el encabezado Kafka Action.
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Notas importantes sobre la configuración de Kerberos
-
Su centro de distribución de claves (KDC) debe poder resolverse mediante un sistema de nombres de dominio privado (DNS) dentro de su destino. VPC Un enfoque posible es añadir la KDC DNS entrada a una zona alojada privada. Para más información sobre este enfoque, consulte Trabajar con zonas alojadas privadas.
-
Cada una VPC debe tener DNS la resolución habilitada. Para obtener más información, consulte DNSUtilización con su VPC.
-
Los grupos de seguridad de la interfaz de red y los grupos de seguridad a nivel de instancia del VPC destino deben permitir el tráfico desde su interior VPC en los siguientes puertos.
-
TCPtráfico en el puerto de escucha de Bootstrap Broker (normalmente 9092, pero debe estar dentro del rango de 9000 a 9100)
-
TCPy tráfico en el puerto 88 para el UDP KDC
-
-
SCRAM-SHA-512
es el único mecanismo de seguridad compatible en las regiones cn-north-1, cn-northwest-1, -1 y -1. us-gov-east us-gov-west