Apache Kafka - AWS IoT Core

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Apache Kafka

La acción Apache Kafka (Kafka) envía los mensajes directamente a Amazon Managed Streaming for Apache Kafka (MSKAmazon), a los clústeres de Apache Kafka gestionados por proveedores externos, como Confluent Cloud, o a los clústeres de Apache Kafka autogestionados. Con la acción de reglas de Kafka, puede enrutar sus datos de IoT a los clústeres de Kafka. Esto le permite crear canalizaciones de datos de alto rendimiento para diversos fines, como el análisis de flujos, la integración de datos, la visualización y las aplicaciones empresariales esenciales.

nota

En este tema se presupone estar familiarizado con la plataforma Apache Kafka y los conceptos relacionados. Para obtener más información sobre Apache Kafka, consulte Apache Kafka. MSK No se admite la tecnología sin servidor. MSK Los clústeres sin servidor solo se pueden crear mediante IAM autenticación, que la acción de regla de Apache Kafka no admite actualmente. Para obtener más información sobre cómo configurar AWS IoT Core con Confluent, consulte Aprovechar Confluent y AWS resolver los desafíos de la administración de datos y dispositivos de IoT.

Requisitos

Esta regla tiene los siguientes requisitos:

  • Un IAM rol que AWS IoT puede asumir para realizar las ec2:CreateNetworkInterface operacionesec2:DescribeNetworkInterfaces,ec2:CreateNetworkInterfacePermission,ec2:DeleteNetworkInterface, ec2:DescribeSubnets ec2:DescribeVpcsec2:DescribeVpcAttribute, y. ec2:DescribeSecurityGroups Este rol crea y administra interfaces de red elásticas para su Amazon Virtual Private Cloud para comunicarse con su agente de Kafka. Para obtener más información, consulte Otorgar a una AWS IoT regla el acceso que requiere.

    En la AWS IoT consola, puede elegir o crear un rol que permita AWS IoT Core realizar esta acción de regla.

    Para obtener más información sobre las interfaces de red, consulte Interfaces de red elásticas en la Guía del EC2 usuario de Amazon.

    La política asociada al rol especificado debería verse de manera similar al siguiente ejemplo.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Si las utiliza AWS Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka, debe crear una IAM función que AWS IoT Core pueda asumir para realizar las secretsmanager:DescribeSecret operaciones secretsmanager:GetSecretValue y.

    La política asociada al rol especificado debería verse de manera similar al siguiente ejemplo.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • Puede ejecutar sus clústeres de Apache Kafka dentro de Amazon Virtual Private Cloud (AmazonVPC). Debe crear un VPC destino de Amazon y utilizar una NAT puerta de enlace en sus subredes para reenviar los mensajes desde AWS IoT un clúster público de Kafka. El motor de AWS IoT reglas crea una interfaz de red en cada una de las subredes enumeradas en el VPC destino para enrutar el tráfico directamente a. VPC Al crear un VPC destino, el motor de AWS IoT reglas crea automáticamente una acción de VPC regla. Para obtener más información sobre las acciones de las VPC reglas, consulteVPCDestinos de nube privada virtual ().

  • Si utiliza una AWS KMS key (KMSclave) gestionada por el cliente para cifrar los datos en reposo, el servicio debe tener permiso para utilizar la KMS clave en nombre de la persona que llama. Para obtener más información, consulte el MSKcifrado de Amazon en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

Parámetros

Al crear una AWS IoT regla con esta acción, debe especificar la siguiente información:

destinationArn

El nombre del recurso de Amazon (ARN) del VPC destino. Para obtener información sobre la creación de un VPC destino, consulteVPCDestinos de nube privada virtual ().

tema

Tema de Kafka para que los mensajes se envíen al agente de Kafka.

Puede sustituir este campo mediante una plantilla de sustitución. Para obtener más información, consulte Plantillas de sustitución.

clave (opcional)

Clave de mensajes de Kafka.

Puede sustituir este campo mediante una plantilla de sustitución. Para obtener más información, consulte Plantillas de sustitución.

encabezados (opcional)

La lista de cabeceras de Kafka que usted especifique. Cada encabezado es un par clave-valor que puede especificar al crear una acción de Kafka. Puede usar estos encabezados para enrutar los datos de los clientes de IoT a los clústeres de Kafka descendentes sin modificar la carga útil de los mensajes.

Puede sustituir este campo mediante una plantilla de sustitución. Para saber cómo pasar una función de regla en línea como plantilla de sustitución en el encabezado de la acción Kafka, consulte los ejemplos. Para obtener más información, consulte Plantillas de sustitución.

nota

Los encabezados en formato binario no son compatibles.

partición (opcional)

Partición de mensajes de Kafka.

Puede sustituir este campo mediante una plantilla de sustitución. Para obtener más información, consulte Plantillas de sustitución.

clientProperties

Un objeto que define las propiedades del cliente productor de Apache Kafka.

acks (opcional)

El número de reconocimientos que el productor requiere que el servidor haya recibido antes de considerar completa una solicitud.

Si especifica 0 como valor, el productor no esperará ningún acuse de recibo del servidor. Si el servidor no recibe el mensaje, el productor no volverá a intentar enviarlo.

Valores válidos: -1, 0, 1, all. El valor predeterminado es 1.

bootstrap.servers

Una lista de pares de host y puerto (por ejemplo host1:port1, host2:port2) que se utilizan para establecer la conexión inicial con el clúster de Kafka.

compression.type (opcional)

El tipo de compresión de todos los datos generados por el productor.

Valores válidos: none, gzip, snappy, lz4, zstd. El valor predeterminado es none.

security.protocol

El protocolo de seguridad utilizado para conectarse a su agente de Kafka.

Valores válidos: SSL, SASL_SSL. El valor predeterminado es SSL.

key.serializer

Especifica cómo convertir los objetos clave que usted proporciona con el ProducerRecord en bytes.

Valor válido: StringSerializer.

value.serializer

Especifica cómo convertir los objetos de valor que proporcione con el ProducerRecord en bytes.

Valor válido: ByteBufferSerializer.

ssl.truststore

El archivo truststore en formato base64 o la ubicación del archivo truststore en AWS Secrets Manager Este valor no es necesario si su almacén de confianza cuenta con la confianza de las autoridades de certificación (CA) de Amazon.

Este campo admite plantillas de sustitución. Si usa Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka, puede usar la get_secret SQL función para recuperar el valor de este campo. Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre la get_secret SQL función, consulteget_secret(secretId, secretType, key, roleArn). Si el almacén de confianza tiene la forma de un archivo, utilice el parámetro SecretBinary. Si el almacén de confianza tiene la forma de una cadena, utilice el parámetro SecretString.

El valor máximo de este recuento es 65 KB.

ssl.truststore.password

La contraseña del almacén de confianza. Este valor solo es obligatorio si ha creado una contraseña para el almacén de confianza.

ssl.keystore

El archivo del almacén de claves. Este valor es obligatorio cuando se especifica SSL como valor para security.protocol.

Este campo admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la get_secret SQL función. Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre la get_secret SQL función, consulteget_secret(secretId, secretType, key, roleArn). Utilice el parámetro SecretBinary.

ssl.keystore.password

La contraseña del almacén para el archivo keystore. Este valor es necesario si especifica un valor para ssl.keystore.

El valor de este campo puede ser texto sin formato. Este campo también admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la get_secret SQL función. Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre la get_secret SQL función, consulteget_secret(secretId, secretType, key, roleArn). Utilice el parámetro SecretString.

ssl.key.password

La contraseña de la clave privada del archivo del almacén de claves.

Este campo admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la get_secret SQL función. Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre la get_secret SQL función, consulteget_secret(secretId, secretType, key, roleArn). Utilice el parámetro SecretString.

sasl.mechanism

El mecanismo de seguridad utilizado para conectarse a su agente de Kafka. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol.

Valores válidos: PLAIN, SCRAM-SHA-512, GSSAPI.

nota

SCRAM-SHA-512es el único mecanismo de seguridad compatible en las regiones cn-north-1, cn-northwest-1, -1 y -1. us-gov-east us-gov-west

sasl.plain.username

El nombre de usuario utilizado para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol y PLAIN para sasl.mechanism.

sasl.plain.password

La contraseña utilizada para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol y PLAIN para sasl.mechanism.

sasl.scram.username

El nombre de usuario utilizado para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol y SCRAM-SHA-512 para sasl.mechanism.

sasl.scram.password

La contraseña utilizada para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol y SCRAM-SHA-512 para sasl.mechanism.

sasl.kerberos.keytab

El archivo keytab para la autenticación de Kerberos en Secrets Manager. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol y GSSAPI para sasl.mechanism.

Este campo admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la función. get_secret SQL Para obtener más información sobre las plantillas de sustitución, consulte Plantillas de sustitución. Para obtener más información sobre la get_secret SQL función, consulteget_secret(secretId, secretType, key, roleArn). Utilice el parámetro SecretBinary.

sasl.kerberos.service.name

El nombre principal de Kerberos con el que se ejecuta Apache Kafka. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol y GSSAPI para sasl.mechanism.

sasl.kerberos.krb5.kdc

El nombre de host del centro de distribución de claves (KDC) al que se conecta su cliente productor de Apache Kafka. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol y GSSAPI para sasl.mechanism.

sasl.kerberos.krb5.realm

El ámbito al que se conecta su cliente productor de Apache Kafka. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol y GSSAPI para sasl.mechanism.

sasl.kerberos.principal

La identidad única de Kerberos a la que Kerberos puede asignar tickets para acceder a los servicios compatibles con Kerberos. Este valor es obligatorio cuando se especifica SASL_SSL para security.protocol y GSSAPI para sasl.mechanism.

Ejemplos

El siguiente JSON ejemplo define una acción de Apache Kafka en una regla. AWS IoT El siguiente ejemplo pasa la función en línea sourceIp() como plantilla de sustitución en el encabezado Kafka Action.

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

Notas importantes sobre la configuración de Kerberos

  • Su centro de distribución de claves (KDC) debe poder resolverse mediante un sistema de nombres de dominio privado (DNS) dentro de su destino. VPC Un enfoque posible es añadir la KDC DNS entrada a una zona alojada privada. Para más información sobre este enfoque, consulte Trabajar con zonas alojadas privadas.

  • Cada una VPC debe tener DNS la resolución habilitada. Para obtener más información, consulte DNSUtilización con su VPC.

  • Los grupos de seguridad de la interfaz de red y los grupos de seguridad a nivel de instancia del VPC destino deben permitir el tráfico desde su interior VPC en los siguientes puertos.

    • TCPtráfico en el puerto de escucha de Bootstrap Broker (normalmente 9092, pero debe estar dentro del rango de 9000 a 9100)

    • TCPy tráfico en el puerto 88 para el UDP KDC

  • SCRAM-SHA-512es el único mecanismo de seguridad compatible en las regiones cn-north-1, cn-northwest-1, -1 y -1. us-gov-east us-gov-west