Apache Kafka - AWS IoT Core

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Apache Kafka

L'action Apache Kafka (Kafka) envoie des messages directement à votre Amazon Managed Streaming for Apache Kafka (MSKAmazon), aux clusters Apache Kafka gérés par des fournisseurs tiers tels que Confluent Cloud ou aux clusters Apache Kafka autogérés. Grâce à l'action des règles de Kafka, vous pouvez acheminer vos données IoT vers des clusters Kafka. Cela vous permet de créer des pipelines de données à hautes performances à diverses fins, telles que l'analyse en continu, l'intégration des données, la visualisation et les applications professionnelles critiques.

Note

Cette rubrique suppose une bonne connaissance de la plateforme Apache Kafka et des concepts associés. Pour de plus amples informations sur Apache Kafka, veuillez consulter Apache Kafka. MSKLe mode sans serveur n'est pas pris en charge. MSKLes clusters sans serveur ne peuvent être créés que par IAM authentification, ce que l'action des règles d'Apache Kafka ne prend pas en charge actuellement. Pour plus d'informations sur la façon de configurer AWS IoT Core avec Confluent, voir Tirer parti de Confluent et résoudre les problèmes de gestion des appareils et des données liés AWS à l'IoT.

Prérequis

Cette action réglementaire est assortie des exigences suivantes :

  • Un IAM rôle qui AWS IoT peut assumer d'exécuter les ec2:DescribeSecurityGroups opérations ec2:CreateNetworkInterface ec2:DescribeNetworkInterfacesec2:CreateNetworkInterfacePermission,ec2:DeleteNetworkInterface,ec2:DescribeSubnets,ec2:DescribeVpcs,ec2:DescribeVpcAttribute,, et. Ce rôle crée et gère des interfaces réseau élastiques vers votre Amazon Virtual Private Cloud afin d'atteindre votre courtier Kafka. Pour de plus amples informations, veuillez consulter Accorder à une AWS IoT règle l'accès dont elle a besoin.

    Dans la AWS IoT console, vous pouvez choisir ou créer un rôle pour autoriser l'exécution AWS IoT Core de cette action de règle.

    Pour plus d'informations sur les interfaces réseau, consultez la section Elastic network interfaces dans le guide de EC2 l'utilisateur Amazon.

    La stratégie associée au rôle que vous spécifiez doit ressembler à l'exemple suivant.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Si vous stockez AWS Secrets Manager les informations d'identification requises pour vous connecter à votre courtier Kafka, vous devez créer un IAM rôle qui AWS IoT Core peut assumer l'exécution des secretsmanager:DescribeSecret opérations secretsmanager:GetSecretValue et.

    La stratégie associée au rôle que vous spécifiez doit ressembler à l'exemple suivant.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • Vous pouvez exécuter vos clusters Apache Kafka dans Amazon Virtual Private Cloud (AmazonVPC). Vous devez créer une VPC destination Amazon et utiliser une NAT passerelle dans vos sous-réseaux pour transférer les messages depuis un AWS IoT cluster Kafka public. Le moteur de AWS IoT règles crée une interface réseau dans chacun des sous-réseaux répertoriés dans la VPC destination pour acheminer le trafic directement vers leVPC. Lorsque vous créez une VPC destination, le moteur de AWS IoT règles crée automatiquement une action de VPC règle. Pour plus d'informations sur les actions relatives aux VPC règles, consultezDestinations du cloud privé virtuel (VPC).

  • Si vous utilisez une KMS clé gérée par AWS KMS key le client pour chiffrer des données au repos, le service doit être autorisé à utiliser la KMS clé au nom de l'appelant. Pour plus d'informations, consultez Amazon MSK Encryption dans le guide du développeur Amazon Managed Streaming for Apache Kafka.

Paramètres

Lorsque vous créez une AWS IoT règle avec cette action, vous devez spécifier les informations suivantes :

destinationArn

Le nom de la ressource Amazon (ARN) de la VPC destination. Pour plus d'informations sur la création d'une VPC destination, consultezDestinations du cloud privé virtuel (VPC).

topic

La rubrique Kafka pour les messages à envoyer à l'agent Kafka.

Vous pouvez remplacer ce champ à l'aide d'un modèle de substitution. Pour de plus amples informations, veuillez consulter Modèles de substitution.

clé (facultatif)

La clé de message Kafka.

Vous pouvez remplacer ce champ à l'aide d'un modèle de substitution. Pour de plus amples informations, veuillez consulter Modèles de substitution.

en-têtes (facultatif)

La liste des en-têtes Kafka que vous spécifiez. Chaque en-tête est une paire clé-valeur que vous pouvez spécifier lors de la création d'une action Kafka. Vous pouvez utiliser ces en-têtes pour acheminer les données des clients IoT vers les clusters Kafka en aval sans modifier la charge utile de vos messages.

Vous pouvez remplacer ce champ à l'aide d'un modèle de substitution. Pour comprendre comment transmettre la fonction d'une règle intégrée en tant que modèle de substitution dans l'en-tête de Kafka Action, consultez les exemples. Pour de plus amples informations, veuillez consulter Modèles de substitution.

Note

Les en-têtes au format binaire ne sont pas pris en charge.

partition (facultatif)

La partition de message Kafka.

Vous pouvez remplacer ce champ à l'aide d'un modèle de substitution. Pour de plus amples informations, veuillez consulter Modèles de substitution.

clientProperties

Un objet qui définit les propriétés du client producteur Apache Kafka.

Packs (facultatif)

Le nombre d'accusés de réception que le producteur demande au serveur de recevoir avant de considérer qu'une demande est complète.

Si vous spécifiez 0 comme valeur, le producteur n'attendra aucun accusé de réception du serveur. Si le serveur ne reçoit pas le message, le producteur ne réessaiera pas de l'envoyer.

Valeurs valides: -1, 0, 1, all. La valeur par défaut est 1.

serveurs bootstrap

Liste des paires d'hôtes et de ports (par exemplehost1:port1,host2:port2) utilisées pour établir la connexion initiale à votre cluster Kafka.

compression.type (facultatif)

Type de compression pour toutes les données générées par le producteur.

Valeurs valides: none, gzip, snappy, lz4, zstd. La valeur par défaut est none.

security.protocol

Le protocole de sécurité utilisé pour vous connecter à votre courtier Kafka.

Valeurs valides : SSL, SASL_SSL. La valeur par défaut est SSL.

key.serializer

Spécifie comment transformer en octets les objets clés que vous fournissez avec leProducerRecord.

Valeur valide : StringSerializer.

value.serializer

Spécifie comment transformer en octets les objets de valeur que vous fournissez avec leProducerRecord.

Valeur valide : ByteBufferSerializer.

ssl.truststore

Le fichier truststore au format base64 ou l'emplacement du fichier truststore dans. AWS Secrets Manager Cette valeur n'est pas obligatoire si votre truststore est approuvé par les autorités de certification Amazon (CA).

Ce champ prend en charge les modèles de substitution. Si vous utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre courtier Kafka, vous pouvez utiliser la get_secret SQL fonction pour récupérer la valeur de ce champ. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur cette get_secret SQL fonction, consultezget_secret (SecreTid, SecretType, clé, roLearn). Si le truststore se présente sous la forme d'un fichier, utilisez le SecretBinary paramètre. Si le truststore se présente sous la forme d'une chaîne, utilisez le SecretString paramètre.

La taille maximale de cette valeur est de 65 Ko.

ssl.truststore.password

Le mot de passe du référentiel d'approbations. Cette valeur n'est requise que si vous avez créé un mot de passe pour le référentiel d'approbations.

ssl.keystore

Le fichier keystore. Cette valeur est obligatoire lorsque vous spécifiez SSL comme valeur poursecurity.protocol.

Ce champ prend en charge les modèles de substitution. Utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre courtier Kafka. Pour récupérer la valeur de ce champ, utilisez la get_secret SQL fonction. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur cette get_secret SQL fonction, consultezget_secret (SecreTid, SecretType, clé, roLearn). Utilisez le SecretBinary paramètre.

ssl.keystore.password

Le mot de passe du fichier keystore. Cette valeur est requise si vous spécifiez une valeur pour ssl.keystore.

La valeur de ce champ peut être en texte brut. Ce champ prend également en charge les modèles de substitution. Utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre courtier Kafka. Pour récupérer la valeur de ce champ, utilisez la get_secret SQL fonction. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur cette get_secret SQL fonction, consultezget_secret (SecreTid, SecretType, clé, roLearn). Utilisez le SecretString paramètre.

clé ssl.mot de passe

Le mot de passe de la clé privée dans votre fichier keystore.

Ce champ prend en charge les modèles de substitution. Utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre courtier Kafka. Pour récupérer la valeur de ce champ, utilisez la get_secret SQL fonction. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur cette get_secret SQL fonction, consultezget_secret (SecreTid, SecretType, clé, roLearn). Utilisez le SecretString paramètre.

sasl.mechanism

Le mécanisme de sécurité utilisé pour se connecter à votre courtier Kafka. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL poursecurity.protocol.

Valeurs valides : PLAIN, SCRAM-SHA-512, GSSAPI.

Note

SCRAM-SHA-512est le seul mécanisme de sécurité pris en charge dans les régions cn-north-1, cn-northwest-1, -1 et -1. us-gov-east us-gov-west

sasl.plain.username

Le nom d'utilisateur utilisé pour récupérer la chaîne secrète depuis Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et PLAIN pour sasl.mechanism.

sasl.plain.password

Mot de passe utilisé pour récupérer la chaîne secrète depuis Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et PLAIN pour sasl.mechanism.

sasl.scram.nom d'utilisateur

Le nom d'utilisateur utilisé pour récupérer la chaîne secrète depuis Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et SCRAM-SHA-512 pour sasl.mechanism.

sasl.scram.password

Mot de passe utilisé pour récupérer la chaîne secrète depuis Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et SCRAM-SHA-512 poursasl.mechanism.

sasl.kerberos.keytab

Le fichier keytab pour l'authentification Kerberos dans Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI poursasl.mechanism.

Ce champ prend en charge les modèles de substitution. Utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre courtier Kafka. Pour récupérer la valeur de ce champ, utilisez la get_secret SQL fonction. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur cette get_secret SQL fonction, consultezget_secret (SecreTid, SecretType, clé, roLearn). Utilisez le SecretBinary paramètre.

sasl.kerberos.service.name

Nom principal Kerberos sous lequel Apache Kafka s'exécute. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI pour sasl.mechanism.

sasl.kerberos.krb5.kdc

Le nom d'hôte du centre de distribution de clés (KDC) auquel votre client producteur Apache Kafka se connecte. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI pour sasl.mechanism.

sasl.kerberos.krb5.realm

Domaine auquel votre client producteur Apache Kafka se connecte. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI pour sasl.mechanism.

sasl.kerberos.principal

Identité Kerberos unique à laquelle Kerberos peut attribuer des tickets pour accéder aux services compatibles Kerberos. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI pour sasl.mechanism.

Exemples

L'JSONexemple suivant définit une action Apache Kafka dans une AWS IoT règle. L'exemple suivant transmet la fonction en ligne sourceIp() comme modèle de substitution dans l'en-tête Kafka Action.

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

Remarques importantes concernant votre configuration Kerberos

  • Votre centre de distribution de clés (KDC) doit pouvoir être résolu via un système de noms de domaine privé (DNS) au sein de votre cibleVPC. Une approche possible consiste à ajouter l'KDCDNSentrée à une zone hébergée privée. Pour plus d'informations sur cette approche, veuillez consulter Utilisation des zones hébergées privées.

  • La DNS résolution VPC doit être activée pour chacune d'entre elles. Pour plus d'informations, consultez la section Utilisation DNS avec votre VPC.

  • Les groupes de sécurité de l'interface réseau et les groupes de sécurité au niveau de l'instance de la VPC destination doivent autoriser le trafic provenant de votre intérieur VPC sur les ports suivants.

    • TCPtrafic sur le port d'écoute du broker bootstrap (souvent 9092, mais doit être compris entre 9000 et 9100)

    • TCPet UDP le trafic sur le port 88 pour le KDC

  • SCRAM-SHA-512est le seul mécanisme de sécurité pris en charge dans les régions cn-north-1, cn-northwest-1, -1 et -1. us-gov-east us-gov-west