Utiliser Apache Kafka comme cible pour AWS Database Migration Service - AWS Database Migration Service

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utiliser Apache Kafka comme cible pour AWS Database Migration Service

Vous pouvez l'utiliser AWS DMS pour migrer des données vers un cluster Apache Kafka. Apache Kafka est une plateforme de streaming distribuée. Apache Kafka vous permet d’ingérer et de traiter des données de streaming en temps réel.

AWS propose également Amazon Managed Streaming pour Apache Kafka (Amazon MSK) à utiliser comme cible. AWS DMS Amazon MSK est un service de streaming Apache Kafka entièrement géré qui simplifie l’implémentation et la gestion des instances Apache Kafka. Il fonctionne avec les versions open source d'Apache Kafka, et vous accédez aux instances Amazon MSK en tant que AWS DMS cibles, exactement comme n'importe quelle instance Apache Kafka. Pour plus d’informations, consultez Qu’est-ce qu’Amazon MSK ? dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

Un cluster Kafka stocke les flux d’enregistrements dans des catégories appelées « rubriques », divisées en partitions. Les partitions sont des séquences identifiées de manière unique d’enregistrements de données (messages) dans une rubrique. Les partitions peuvent être réparties entre plusieurs agents dans un cluster pour permettre le traitement parallèle des enregistrements d’une rubrique. Pour de plus amples informations sur les rubriques et les partitions et leur distribution dans Apache Kafka, veuillez consulter Rubriques et journaux et distribution.

Votre cluster Kafka peut être une instance Amazon MSK, un cluster exécuté sur une instance Amazon EC2 ou un cluster sur site. Une instance Amazon MSK ou un cluster sur une instance Amazon EC2 peut se trouver dans le même VPC ou dans un autre VPC. Dans le cas d’un cluster sur site, vous pouvez utiliser votre propre serveur de noms sur site pour votre instance de réplication afin de résoudre le nom d’hôte du cluster. Pour en savoir plus sur la configuration d’un serveur de noms pour votre instance de réplication, consultez Utilisation de votre propre serveur de noms sur site. Pour plus d’informations sur la configuration d’un réseau, consultez Configuration d'un réseau pour une instance de réplication.

Lorsque vous utilisez un cluster Amazon MSK, assurez-vous que son groupe de sécurité autorise l’accès à partir de votre instance de réplication. Pour en savoir plus sur la modification du groupe de sécurité d’un cluster Amazon MSK, consultez Modification du groupe de sécurité d’un cluster Amazon MSK.

AWS Database Migration Service publie des enregistrements dans un sujet Kafka à l'aide de JSON. Au cours de la conversion, AWS DMS sérialise chaque enregistrement de la base de données source dans une paire attribut-valeur au format JSON.

Utilisez le mappage d’objet pour migrer vos données de n’importe quelle source de données prise en charge vers un cluster Kafka cible. Avec le mappage d’objet, vous déterminez la façon de structurer les enregistrements de données dans la rubrique cible. Vous définissez également une clé de partition pour chaque table, qu'Apache Kafka utilise pour regrouper les données dans ses partitions.

Actuellement, ne AWS DMS prend en charge qu'un seul sujet par tâche. Pour une seule tâche comportant plusieurs tables, tous les messages sont placés dans une seule rubrique. Chaque message inclut une section de métadonnées qui identifie le schéma et la table cibles. AWS DMS les versions 3.4.6 et supérieures prennent en charge la réplication multisujet à l'aide du mappage d'objets. Pour de plus amples informations, veuillez consulter Réplication à plusieurs rubriques à l’aide du mappage d’objet.

Paramètres du point de terminaison Apache Kafka

Vous pouvez spécifier les détails de connexion via les paramètres du point de terminaison dans la AWS DMS console ou via l'--kafka-settingsoption de la CLI. Les conditions requises pour chaque paramètre sont les suivantes :

  • Broker : spécifiez les emplacements d’un ou de plusieurs agents dans votre cluster Kafka sous la forme d’une liste séparée par des virgules de tous les éléments broker-hostname:port. Par exemple : "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". Ce paramètre peut spécifier les emplacements d’un ou de tous les agents du cluster. Les agents de cluster communiquent tous pour gérer le partitionnement des enregistrements de données migrés vers la rubrique.

  • Topic : (facultatif) spécifiez le nom de rubrique avec une longueur maximale de 255 lettres et symboles. Vous pouvez utiliser le point (.), le trait de soulignement (_) et le moins (-). Les noms de rubrique avec un point (.) ou un trait de soulignement (_) peuvent entrer en collision dans des structures de données internes. Utilisez l'un ou l'autre de ces symboles, mais pas les deux dans le nom de la rubrique. Si vous ne spécifiez pas de nom de rubrique, AWS DMS "kafka-default-topic" utilisez-le comme rubrique de migration.

    Note

    Pour AWS DMS créer soit un sujet de migration que vous spécifiez, soit le sujet par défaut, défini dans le auto.create.topics.enable = true cadre de la configuration de votre cluster Kafka. Pour plus d’informations, consultez Limitations liées à l'utilisation d'Apache Kafka comme cible pour AWS Database Migration Service.

  • MessageFormat : format de sortie pour les enregistrements créés sur le point de terminaison. Le format du message est JSON (par défaut) ou JSON_UNFORMATTED (une seule ligne sans onglet).

  • MessageMaxBytes : taille maximale en octets des enregistrements créés sur le point de terminaison. La valeur par défaut est 1 000 000.

    Note

    Vous ne pouvez utiliser la AWS CLI/SDK que pour passer à une valeur autre que celle par MessageMaxBytes défaut. Par exemple, pour modifier votre point de terminaison Kafka existant et redéfinir la valeur de MessageMaxBytes, utilisez la commande suivante.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails : fournit des informations détaillées sur les transactions à partir de la base de données source. Ces informations comprennent un horodatage de validation, une position de journal et des valeurs pour transaction_id, previous_transaction_id et transaction_record_id(le décalage d'enregistrement dans une transaction). L’argument par défaut est false.

  • IncludePartitionValue : affiche la valeur de partition dans la sortie du message Kafka, sauf si le type de partition est schema-table-type. L’argument par défaut est false.

  • PartitionIncludeSchemaTable : préfixe les noms de schéma et de table aux valeurs de partition, lorsque le type de partition est primary-key-type. Cela augmente la distribution des données entre les partitions Kafka. Par exemple, supposons qu'un schéma SysBench comporte des milliers de tables et que chaque table n'ait qu'une plage limitée pour une clé primaire. Dans ce cas, la même clé primaire est envoyée à partir de milliers de tables vers la même partition, ce qui provoque une limitation. L’argument par défaut est false.

  • IncludeTableAlterOperations : inclut toutes les opérations DDL (Data Definition Language) qui modifient la table dans les données de contrôle, telles que rename-table, drop-table, add-column, drop-column et rename-column. L’argument par défaut est false.

  • IncludeControlDetails : affiche les informations de contrôle détaillées pour la définition de table, la définition de colonne et les modifications de table et de colonne dans la sortie du message Kafka. L’argument par défaut est false.

  • IncludeNullAndEmpty : inclut les colonnes NULL et vides dans la cible. L’argument par défaut est false.

  • SecurityProtocol : définit une connexion sécurisée à un point de terminaison cible Kafka à l’aide du protocole TLS (Transport Layer Security). Les options sont ssl-authentication, ssl-encryption et sasl-ssl. L’utilisation de sasl-ssl requiert SaslUsername et SaslPassword.

  • SslEndpointIdentificationAlgorithm— Définit la vérification du nom d'hôte pour le certificat. Ce paramètre est pris en charge dans les AWS DMS versions 3.5.1 et ultérieures. Les options disponibles sont les suivantes :

    • NONE: désactive la vérification du nom d'hôte du broker dans la connexion client.

    • HTTPS: Activez la vérification du nom d'hôte du broker dans la connexion client.

Vous pouvez augmenter la vitesse du transfert dans les paramètres. Pour ce faire, AWS DMS prend en charge un chargement complet multithread sur un cluster cible Apache Kafka. AWS DMS prend en charge ce multithreading avec des paramètres de tâche qui incluent les éléments suivants :

  • MaxFullLoadSubTasks— Utilisez cette option pour indiquer le nombre maximum de tables sources à charger en parallèle. AWS DMS charge chaque table dans la table cible Kafka correspondante à l'aide d'une sous-tâche dédiée. La valeur par défaut est 8 ; la valeur maximale 49.

  • ParallelLoadThreads— Utilisez cette option pour spécifier le nombre de threads AWS DMS utilisés pour charger chaque table dans sa table cible Kafka. La valeur maximale pour une cible Apache Kafka est 32. Vous pouvez demander une augmentation de cette limite maximale.

  • ParallelLoadBufferSize : utilisez cette option pour spécifier le nombre maximal d’enregistrements à stocker dans la mémoire tampon utilisée par les threads de chargement parallèles pour charger les données dans la cible Kafka. La valeur par défaut est 50. La valeur maximale est 1 000. Utilisez ce paramètre avec ParallelLoadThreads. ParallelLoadBufferSize est valide uniquement dans le cas de plusieurs threads.

  • ParallelLoadQueuesPerThread : utilisez cette option pour spécifier le nombre de files d’attente auxquelles chaque thread simultané accède pour extraire les enregistrements de données des files d’attente et générer un chargement par lots pour la cible. La valeur par défaut est 1. La valeur maximale est 512.

Vous pouvez améliorer les performances de capture des données de modification (CDC) pour les points de terminaison Kafka en ajustant les paramètres de tâche des threads parallèles et des opérations en bloc. Pour ce faire, vous pouvez spécifier le nombre de threads simultanés, les files d'attente par thread et le nombre d'enregistrements à stocker dans un tampon à l'aide de la tâche ParallelApply*. Par exemple, supposons que vous souhaitiez effectuer un chargement CDC et appliquer 128 threads en parallèle. Vous souhaitez également accéder à 64 files d'attente par thread, avec 50 enregistrements stockés par tampon.

Pour améliorer les performances du CDC, AWS DMS prend en charge les paramètres de tâche suivants :

  • ParallelApplyThreads— Spécifie le nombre de threads simultanés AWS DMS utilisés lors d'un chargement CDC pour transférer des enregistrements de données vers un point de terminaison cible Kafka. La valeur par défaut est zéro (0) et la valeur maximale est 32.

  • ParallelApplyBufferSize : spécifie le nombre maximal d’enregistrements à stocker dans chaque file d’attente de mémoire tampon pour que les threads simultanés soient transférés vers un point de terminaison cible Kafka lors d’un chargement CDC. La valeur par défaut est 100 et la valeur maximale est 1 000. Utilisez cette option lorsque ParallelApplyThreads spécifie plusieurs threads.

  • ParallelApplyQueuesPerThread : spécifie le nombre de files d’attente auxquelles chaque thread accède pour extraire les enregistrements de données des files d’attente et générer un chargement par lots pour un point de terminaison Kafka pendant la CDC. La valeur par défaut est 1. La valeur maximale est 512.

Lorsque vous utilisez les paramètres de tâche ParallelApply*, la valeur par défaut partition-key-type est la valeur primary-key de la table, pas schema-name.table-name.

Connexion à Kafka à l’aide du protocole TLS (Transport Layer Security)

Un cluster Kafka accepte les connexions sécurisées à l’aide du protocole TLS (Transport Layer Security). Avec DMS, vous pouvez utiliser l’une des trois options de protocole de sécurité suivantes pour sécuriser la connexion d’un point de terminaison Kafka.

Chiffrement SSL (server-encryption)

Les clients valident l’identité du serveur par le biais du certificat du serveur. Une connexion chiffrée est alors établie entre le serveur et le client.

Authentification SSL (mutual-authentication)

Le serveur et le client valident mutuellement leur identité respective via leurs propres certificats. Une connexion chiffrée est alors établie entre le serveur et le client.

SASL-SSL (mutual-authentication)

La méthode SASL (Simple Authentication and Security Layer) remplace le certificat du client par un nom d’utilisateur et un mot de passe pour valider l’identité du client. Plus précisément, vous fournissez un nom d’utilisateur et un mot de passe enregistrés par le serveur afin que ce dernier puisse valider l’identité d’un client. Une connexion chiffrée est alors établie entre le serveur et le client.

Important

Apache Kafka et Amazon MSK acceptent les certificats résolus. Il s’agit d’une limitation connue de Kafka et d’Amazon MSK qui doit être corrigée. Pour plus d’informations, consultez Apache Kafka issues, KAFKA-3700.

Si vous utilisez Amazon MSK, envisagez d’utiliser des listes de contrôle d’accès (ACL) pour contourner cette limitation connue. Pour plus d’informations sur l’utilisation des listes ACL, consultez la section Listes de contrôle d’accès (ACL) Apache Kafka dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

Si vous utilisez un cluster Kafka autogéré, consultez Comment dated 21/Oct/18 pour en savoir plus sur la configuration de votre cluster.

Utilisation du chiffrement SSL avec Amazon MSK ou un cluster Kafka autogéré

Vous pouvez utiliser le chiffrement SSL pour sécuriser la connexion d’un point de terminaison à Amazon MSK ou à un cluster Kafka autogéré. Lorsque vous utilisez la méthode d’authentification par chiffrement SSL, les clients valident l’identité d’un serveur par le biais du certificat du serveur. Une connexion chiffrée est alors établie entre le serveur et le client.

Pour utiliser le chiffrement SSL pour vous connecter à Amazon MSK
  • Définissez le paramètre de point de terminaison du protocole de sécurité (SecurityProtocol) à l’aide de l’option ssl-encryption lorsque vous créez votre point de terminaison Kafka cible.

    L’exemple de code JSON suivant définit le protocole de sécurité sur le chiffrement SSL.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
Pour utiliser le chiffrement SSL pour un cluster Kafka autogéré
  1. Si vous utilisez une autorité de certification (CA) privée dans votre cluster Kafka sur site, chargez votre certificat de CA privée et obtenez un Amazon Resource Name (ARN).

  2. Définissez le paramètre de point de terminaison du protocole de sécurité (SecurityProtocol) à l’aide de l’option ssl-encryption lorsque vous créez votre point de terminaison Kafka cible. L’exemple de code JSON suivant définit le protocole de sécurité sur ssl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. Si vous utilisez une CA privée, définissez SslCaCertificateArn dans l’ARN obtenu à la première étape ci-dessus.

Utilisation de l’authentification SSL

Vous pouvez utiliser l’authentification SSL pour sécuriser la connexion d’un point de terminaison à Amazon MSK ou à un cluster Kafka autogéré.

Pour activer l’authentification client et le chiffrement à l’aide de l’authentification SSL pour se connecter à Amazon MSK, procédez comme suit :

  • Préparez une clé privée et un certificat public pour Kafka.

  • Chargez les certificats dans le gestionnaire de certificats DMS.

  • Créez un point de terminaison cible Kafka avec les ARN de certificat correspondants spécifiés dans les paramètres de point de terminaison Kafka.

Pour préparer une clé privée et un certificat public pour Amazon MSK
  1. Créez une instance EC2 et configurez un client de sorte qu’il utilise l’authentification, comme décrit dans les étapes 1 à 9 de la section Authentification TLS mutuelle du Guide du développeur Amazon Managed Streaming for Apache Kafka.

    Après avoir effectué ces étapes, vous disposez d’un Certificate-ARN (l’ARN de certificat public enregistré dans ACM) et d’une clé privée contenus dans un fichier kafka.client.keystore.jks.

  2. Obtenez le certificat public et copiez-le dans le fichier signed-certificate-from-acm.pem à l’aide de la commande suivante :

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    Cette commande renvoie des informations semblables à celles de l’exemple suivant :

    {"Certificate": "123", "CertificateChain": "456"}

    Vous copiez ensuite votre équivalent de "123" dans le fichier signed-certificate-from-acm.pem.

  3. Pour obtenir la clé privée, importez la clé msk-rsa à partir de kafka.client.keystore.jks to keystore.p12, comme illustré dans l’exemple suivant.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Utilisez la commande suivante pour exporter keystore.p12 au format .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    Le message Entrer une phrase passe PEM apparaît et identifie la clé appliquée pour chiffrer le certificat.

  5. Supprimez les attributs de conteneur et les attributs de clé du fichier .pem pour vous assurer que la première ligne commence par la chaîne suivante.

    ---BEGIN ENCRYPTED PRIVATE KEY---
Pour charger un certificat public et une clé privée dans le gestionnaire de certificats DMS et tester la connexion à Amazon MSK
  1. Chargez dans le gestionnaire de certificats DMS à l’aide de la commande suivante.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Créez un point de terminaison cible Amazon MSK et testez la connexion pour vous assurer que l’authentification TLS fonctionne.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Important

Vous pouvez utiliser l’authentification SSL pour sécuriser une connexion à un cluster Kafka autogéré. Dans certains cas, vous devrez peut-être utiliser une autorité de certification (CA) privée dans votre cluster Kafka sur site. Si tel est le cas, chargez votre chaîne de CA, votre certificat public et votre clé privée dans le gestionnaire de certificats DMS. Utilisez ensuite l’Amazon Resource Name (ARN) correspondant dans vos paramètres de point de terminaison lorsque vous créez votre point de terminaison cible Kafka sur site.

Pour préparer une clé privée et un certificat signé pour un cluster Kafka autogéré
  1. Générez une paire de clés comme indiqué dans l’exemple suivant.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Générez une demande de signature de certificat (CSR).

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Utilisez la CA du magasin de clés de confiance de votre cluster pour signer la CSR. Si vous n’avez pas de CA, vous pouvez créer votre propre CA privée.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Importez ca-cert dans le magasin de clés de confiance et le magasin de clés du serveur. Si vous n’avez pas de magasin de clés de confiance, utilisez la commande suivante pour le créer et y importer ca-cert .

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Signez le certificat.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Importez le certificat signé dans le magasin de clés.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Utilisez la commande suivante pour importer la clé on-premise-rsa de kafka.server.keystore.jks dans keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Utilisez la commande suivante pour exporter keystore.p12 au format .pem.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Chargez encrypted-private-server-key.pem, signed-certificate.pem et ca-cert dans le gestionnaire de certificats DMS.

  10. Créez un point de terminaison en utilisant les ARN renvoyés.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Utilisation de l’authentification SASL-SSL pour se connecter à Amazon MSK

La méthode SASL (Simple Authentication and Security Layer) utilise un nom d’utilisateur et un mot de passe pour valider l’identité d’un client et établit une connexion chiffrée entre le serveur et le client.

Pour utiliser SASL, vous devez commencer par créer un nom d’utilisateur et un mot de passe sécurisés lorsque vous configurez votre cluster Amazon MSK. Pour savoir comment configurer un nom d’utilisateur et un mot de passe sécurisés pour un cluster Amazon MSK, consultez Configuration de l’authentification SASL/SCRAM pour un cluster Amazon MSK dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

Ensuite, lorsque vous créez votre point de terminaison cible Kafka, définissez le paramètre de point de terminaison du protocole de sécurité (SecurityProtocol) à l’aide de l’option sasl-ssl. Vous définissez également les options SaslUsername et SaslPassword. Assurez-vous qu’elles sont cohérentes avec le nom d’utilisateur et le mot de passe sécurisés que vous avez créés lorsque vous avez configuré votre cluster Amazon MSK pour la première fois, comme illustré dans l’exemple JSON suivant.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
Note
  • Actuellement, ne AWS DMS prend en charge que le protocole SASL-SSL public soutenu par une autorité de certification. DMS ne prend pas en charge le protocole SASL-SSL pour une utilisation avec un cluster Kafka autogéré basé sur une CA privée.

  • Pour l'authentification SASL-SSL, AWS DMS prend en charge le mécanisme SCRAM-SHA-512 par défaut. AWS DMS les versions 3.5.0 et supérieures prennent également en charge le mécanisme Plain. Pour prendre en charge le mécanisme Plain, définissez le paramètre SaslMechanism du type de données d’API KafkaSettings sur PLAIN.

Utilisation d'une image antérieure pour afficher les valeurs originales des lignes de la CDC pour Apache Kafka comme cible

Lorsque vous écrivez des mises à jour de CDC sur une cible de diffusion de données comme Kafka, vous pouvez afficher les valeurs d'origine d'une ligne de base de données source avant de les modifier par une mise à jour. Pour ce faire, AWS DMS remplit une image antérieure des événements de mise à jour en fonction des données fournies par le moteur de base de données source.

Différents moteurs de base de données source fournissent différentes quantités d'informations pour une image antérieure :

  • Oracle met uniquement à jour des colonnes si elles changent.

  • PostgreSQL fournit uniquement des données pour les colonnes qui font partie de la clé primaire (modifiée ou non). Si la réplication logique est utilisée et que REPLICA IDENTITY FULL est défini pour la table source, vous pouvez obtenir les informations antérieures et postérieures complètes sur la ligne écrite dans les journaux WAL et disponible ici.

  • MySQL fournit généralement des données pour toutes les colonnes (modifiées ou non).

Pour activer avant l'imagerie pour ajouter des valeurs d'origine de la base de données source à la sortie AWS DMS , utilisez le paramètre de tâche BeforeImageSettings ou le paramètre add-before-image-columns. Ce paramètre applique une règle de transformation de colonne.

BeforeImageSettings ajoute un nouvel attribut JSON à chaque opération de mise à jour avec des valeurs collectées à partir du système de base de données source, comme indiqué ci-dessous.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
Note

Appliquez les BeforeImageSettings à la charge complète et aux tâches CDC (qui migrent les données existantes et répliquent les modifications en cours), ou aux tâches CDC uniquement (qui répliquent les modifications de données uniquement). N’appliquez pas les BeforeImageSettings aux tâches à pleine charge uniquement.

Pour les options BeforeImageSettings, les conditions suivantes s'appliquent :

  • Définissez l'option EnableBeforeImage sur true pour activer la génération d’image antérieure. L’argument par défaut est false.

  • Utilisez l'option FieldName pour attribuer un nom au nouvel attribut JSON. Quand EnableBeforeImage est true, FieldName est obligatoire et ne peut pas être vide.

  • L'option ColumnFilter spécifie une colonne à ajouter en utilisant la génération d’image antérieure. Pour ajouter uniquement des colonnes faisant partie des clés primaires de la table, utilisez la valeur par défaut, pk-only. Pour ajouter uniquement des colonnes qui ne sont pas de type LOB, utilisez non-lob. Pour ajouter une colonne ayant une valeur d'image antérieure, utilisez all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Utilisation d'une règle de transformation d'image antérieure

Au lieu des paramètres de tâche, vous pouvez utiliser le paramètre add-before-image-columns, qui applique une règle de transformation de colonne. Avec ce paramètre, vous pouvez activer la génération d’image antérieure pendant la CDC sur des cibles de diffusion de données telles que Kafka.

En utilisant add-before-image-columns dans une règle de transformation, vous pouvez exercer un contrôle plus précis sur les résultats de l'image antérieure. Les règles de transformation vous permettent d'utiliser un localisateur d'objets qui vous fournit un contrôle sur les tables sélectionnées pour la règle. En outre, vous pouvez enchaîner les règles de transformation, ce qui permet d'appliquer différentes règles à différentes tables. Vous pouvez ensuite manipuler les colonnes produites à l'aide d'autres règles.

Note

N'utilisez pas le paramètre add-before-image-columns avec le paramètre de tâche BeforeImageSettings dans la même tâche. N’utilisez pas les deux, pour une seule tâche.

Un type de règle transformation avec le paramètre add-before-image-columns d'une colonne doit fournir une section before-image-def. Vous en trouverez un exemple ci-dessous.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

La valeur de column-prefix est ajoutée à un nom de colonne et la valeur par défaut de column-prefix est BI_. La valeur de column-suffix est ajoutée au nom de la colonne et la valeur par défaut est vide. Ne définissez pas les deux column-prefix et column-suffix sur des chaînes vides.

Choisissez une valeur pour column-filter. Pour ajouter uniquement les colonnes qui font partie des clés primaires de la table, choisissez pk-only . Choisissez non-lob d'ajouter uniquement des colonnes qui ne sont pas de type LOB. Ou choisissez all d'ajouter une colonne qui a une valeur d’image antérieure.

Exemple de règle de transformation d'image antérieure

La règle de transformation de l'exemple suivant ajoute une nouvelle colonne appelée BI_emp_no dans la cible. Ainsi, une instruction comme UPDATE employees SET emp_no = 3 WHERE emp_no = 1; remplit le champ BI_emp_no avec 1. Lorsque vous écrivez des mises à jour de CDC sur des cibles Amazon S3, la colonne BI_emp_no permet de savoir quelle ligne d’origine a été mise à jour.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

Pour plus d'informations sur l'utilisation de l'action de règle add-before-image-columns, consultez Règles et actions de transformation.

Limitations liées à l'utilisation d'Apache Kafka comme cible pour AWS Database Migration Service

Les limitations suivantes s'appliquent lorsque vous utilisez apache Kafka comme cible :

  • AWS DMS Les points de terminaison cibles de Kafka ne prennent pas en charge le contrôle d'accès IAM pour Amazon Managed Streaming for Apache Kafka (Amazon MSK).

  • Le Mode LOB complet n'est pas pris en charge.

  • Spécifiez un fichier de configuration Kafka pour votre cluster avec des propriétés permettant AWS DMS de créer automatiquement de nouveaux sujets. Incluez le paramètre, auto.create.topics.enable = true. Si vous utilisez Amazon MSK, vous pouvez spécifier la configuration par défaut lorsque vous créez votre cluster Kafka, puis redéfinir le paramètre auto.create.topics.enable sur true. Pour plus d’informations sur les paramètres de configuration par défaut, consultez Configuration Amazon MSK par défaut dans le Guide du développeur Amazon Managed Streaming for Apache Kafka. Si vous devez modifier un cluster Kafka existant créé à l'aide d'Amazon MSK, exécutez la AWS CLI commande aws kafka create-configuration pour mettre à jour votre configuration Kafka, comme dans l'exemple suivant :

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Ici, //~/kafka_configuration est le fichier de configuration que vous avez créé avec les paramètres de propriété requis.

    Si vous utilisez votre propre instance Kafka installée sur Amazon EC2, modifiez la configuration du cluster Kafka avec auto.create.topics.enable = true le paramètre AWS DMS permettant de créer automatiquement de nouveaux sujets, en utilisant les options fournies avec votre instance.

  • AWS DMS publie chaque mise à jour d'un seul enregistrement de la base de données source sous la forme d'un enregistrement de données (message) dans un sujet Kafka donné, quelles que soient les transactions.

  • AWS DMS prend en charge les deux formes suivantes pour les clés de partition :

    • SchemaName.TableName : une combinaison du nom du schéma et du nom de la table.

    • ${AttributeName} : la valeur d'un des champs du fichier JSON, ou la clé primaire de la table dans la base de données source.

  • BatchApply n’est pas pris en charge pour un point de terminaison Kafka. L’utilisation de l’application par lots (par exemple, le paramètre de tâche de métadonnées cible BatchApplyEnabled) pour une cible Kafka peut entraîner une perte de données.

  • AWS DMS ne prend pas en charge la migration de valeurs de type de BigInt données comportant plus de 16 chiffres. Pour contourner cette limitation, vous pouvez utiliser la règle de transformation suivante pour convertir la colonne BigInt en chaîne. Pour plus d’informations sur les règles de transformation, consultez Règles et actions de transformation.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Utilisation du mappage d’objet pour migrer les données vers une rubrique Kafka

AWS DMS utilise des règles de mappage de tables pour mapper les données de la source vers le sujet Kafka cible. Pour mapper des données à une rubrique cible, vous utilisez un type de règle de mappage de table appelé « mappage d’objet ». Vous utilisez le mappage d'objet pour définir la façon dont les enregistrements de données de la source sont mappés sur les enregistrements de données publiés dans une rubrique Kafka.

Les rubriques Kafka ne disposent pas d'une structure prédéfinie autre que le fait d'avoir une clé de partition.

Note

Vous n’avez pas besoin d’utiliser le mappage d’objet. Vous pouvez utiliser un mappage de table standard pour différentes transformations. Cependant, le type de clé de partition suivra les comportements par défaut suivants :

  • La clé primaire est utilisée comme clé de partition pour le chargement complet.

  • Si aucun paramètre de tâche parallel-apply n’est utilisé, schema.table est utilisé comme clé de partition pour la CDC.

  • Si des paramètres de tâche parallel-apply sont utilisés, la clé primaire est utilisée comme clé de partition pour la CDC.

Pour créer une règle de mappage d'objet, spécifiez rule-type comme object-mapping. Cette règle spécifie le type de mappage d'objet que vous souhaitez utiliser.

La structure de la règle est la suivante.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS prend actuellement en charge map-record-to-record et map-record-to-document en tant que seules valeurs valides pour le rule-action paramètre. Ces paramètres affectent les valeurs qui ne sont pas exclues de la liste d’attributs exclude-columns. Les map-record-to-document valeurs map-record-to-record et indiquent le mode de AWS DMS gestion de ces enregistrements par défaut. Ces valeurs n'affectent en aucune façon les mappages d'attributs.

Utilisez map-record-to-record lors d'une migration d'une base de données relationnelle vers une rubrique Kafka. Ce type de règle utilise la valeur taskResourceId.schemaName.tableName de la base de données relationnelle comme clé de partition dans la rubrique Kafka, et crée un attribut pour chaque colonne dans la base de données source.

Lorsque vous utilisez map-record-to-record, notez ce qui suit :

  • Ce paramètre n’affecte que les colonnes exclues par la liste exclude-columns.

  • Pour chacune de ces colonnes, AWS DMS crée un attribut correspondant dans le sujet cible.

  • AWS DMS crée cet attribut correspondant, que la colonne source soit utilisée ou non dans un mappage d'attributs.

Une manière de comprendre map-record-to-record est de le voir en action. Dans cet exemple, imaginons que vous commencez avec une ligne de table d'une base de données relationnelle, présentant la structure et les données suivantes :

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

Pour migrer ces informations à partir d'un schéma nommé Test vers une rubrique Kafka, vous créez des règles pour mapper les données sur la rubrique cible. La règle suivante illustre ce mappage.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Compte tenu d’une rubrique Kafka et d’une clé de partition (dans ce cas, taskResourceId.schemaName.tableName), l’exemple ci-dessous illustre le format d’enregistrement résultant à l’aide de nos exemples de données dans la rubrique cible Kafka :

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Restructuration de données avec le mappage d'attribut

Vous pouvez restructurer les données lors de leur migration vers une rubrique Kafka à l'aide d'un mappage d'attribut. Par exemple, vous pourriez vouloir regrouper plusieurs champs de la source en un seul champ dans la cible. Le mappage d'attribut suivant illustre comment restructurer les données.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

Pour définir une valeur constante pour partition-key, spécifiez une valeur partition-key. Par exemple, vous pouvez le faire pour forcer le stockage de toutes les données dans une seule partition. Le mappage suivant illustre cette approche.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
Note

La valeur partition-key d'un enregistrement de contrôle correspondant à une table spécifique est TaskId.SchemaName.TableName. La valeur partition-key d'un enregistrement de contrôle correspondant à une tâche spécifique est le TaskId de cet enregistrement. La spécification d'une valeur partition-key dans le mappage d'objet n'a aucun impact sur la partition-key d'un enregistrement de contrôle.

Réplication à plusieurs rubriques à l’aide du mappage d’objet

Par défaut, les AWS DMS tâches migrent toutes les données sources vers l'une des rubriques Kafka suivantes :

  • Comme indiqué dans le champ Rubrique du point de terminaison AWS DMS cible.

  • Comme indiqué par kafka-default-topic si le champ Rubrique du point de terminaison cible n’est pas renseigné et que le paramètre auto.create.topics.enable Kafka est défini sur true.

Avec les versions 3.4.6 et supérieures AWS DMS du moteur, vous pouvez utiliser l'kafka-target-topicattribut pour associer chaque table source migrée à une rubrique distincte. Par exemple, les règles de mappage d’objet suivantes migrent les tables sources Customer et Address vers les rubriques Kafka customer_topic et address_topic, respectivement. Dans le même temps, AWS DMS migre toutes les autres tables sources, y compris la Bills table du Test schéma, vers le sujet spécifié dans le point de terminaison cible.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

En utilisant la réplication à plusieurs rubriques Kafka, vous pouvez regrouper et migrer les tables sources afin de séparer les rubriques Kafka à l’aide d’une seule tâche de réplication.

Format de message pour Apache Kafka

La sortie JSON est simplement une liste de paires clé-valeur.

RecordType

Les enregistrements peuvent être de type Données ou Contrôle. Les enregistrements de données représentent les lignes réelles de la source. Les enregistrements de contrôle sont destinés à des événements importants dans le flux, par exemple un redémarrage de la tâche.

Opération

Pour les enregistrements de données, l'opération peut être load,insert, update ou delete.

Pour les enregistrements de contrôle, l’opération peut être create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column ou column-type-change.

SchemaName

Schéma source de l'enregistrement. Ce champ peut être vide pour un enregistrement de contrôle.

TableName

Table source de l'enregistrement. Ce champ peut être vide pour un enregistrement de contrôle.

Horodatage

Horodatage de la construction du message JSON. Le champ est formaté selon le format ISO 8601.

L’exemple de message JSON suivant illustre un message de type de données avec toutes les métadonnées supplémentaires.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

L’exemple de message JSON suivant illustre un message de type contrôle.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }