Utilisation d'un pipeline OpenSearch d'ingestion avec Amazon DynamoDB - Amazon OpenSearch Service

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation d'un pipeline OpenSearch d'ingestion avec Amazon DynamoDB

Vous pouvez utiliser un pipeline d' OpenSearch ingestion avec DynamoDB pour diffuser des événements de table DynamoDB (tels que la création, la mise à jour et la suppression) vers les domaines et les collections Amazon Service. OpenSearch Le pipeline OpenSearch d'ingestion intègre une infrastructure de capture des données de modification (CDC) afin de fournir un moyen à grande échelle et à faible latence de diffuser en continu des données à partir d'une table DynamoDB.

Vous pouvez utiliser DynamoDB comme source pour traiter les données de deux manières : avec ou sans capture initiale complète.

Un instantané initial complet est une sauvegarde d'une table réalisée par DynamoDB avec point-in-time la fonctionnalité recovery PITR (). DynamoDB télécharge cet instantané sur Amazon S3. À partir de là, un pipeline d' OpenSearch ingestion l'envoie vers un index d'un domaine ou le partitionne en plusieurs index d'un domaine. Pour garantir la cohérence des données dans OpenSearch DynamoDB, le pipeline synchronise tous les événements de création, de mise à jour et de suppression de la table DynamoDB avec les documents enregistrés dans l'index ou les index. OpenSearch

Lorsque vous utilisez un instantané initial complet, votre pipeline d' OpenSearch ingestion ingère d'abord l'instantané, puis commence à lire les données provenant de DynamoDB Streams. Il finit par rattraper le retard et maintenir la cohérence des données en temps quasi réel entre DynamoDB et. OpenSearch Lorsque vous choisissez cette option, vous devez activer les deux options PITR ainsi qu'un flux DynamoDB sur votre table.

Vous pouvez également utiliser l'intégration d' OpenSearch ingestion avec DynamoDB pour diffuser des événements sans capture instantanée. Choisissez cette option si vous disposez déjà d'un instantané complet d'un autre mécanisme, ou si vous souhaitez simplement diffuser les événements actuels à partir d'une table DynamoDB avec DynamoDB Streams. Lorsque vous choisissez cette option, il vous suffit d'activer un flux DynamoDB sur votre table.

Pour plus d'informations sur cette intégration, consultez la section ETLIntégration zéro de DynamoDB à OpenSearch Amazon Service dans Amazon DynamoDB le Guide du développeur.

Prérequis

Pour configurer votre pipeline, vous devez disposer d'une table DynamoDB sur laquelle DynamoDB Streams est activé. Votre stream doit utiliser le type de NEW_IMAGE stream view. Cependant, les pipelines OpenSearch d'ingestion peuvent également diffuser des événements NEW_AND_OLD_IMAGES si ce type de vue de flux correspond à votre cas d'utilisation.

Si vous utilisez des instantanés, vous devez également activer la point-in-time restauration sur votre table. Pour plus d'informations, consultez les sections Création d'une table, point-in-time Activation de la restauration et Activation d'un flux dans le guide du développeur Amazon DynamoDB.

Étape 1 : configurer le rôle du pipeline

Une fois votre table DynamoDB configurée, configurez le rôle de pipeline que vous souhaitez utiliser dans la configuration de votre pipeline et ajoutez les autorisations DynamoDB suivantes dans le rôle :

{ "Version": "2012-10-17", "Statement": [ { "Sid": "allowRunExportJob", "Effect": "Allow", "Action": [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime" ], "Resource": [ "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table" ] }, { "Sid": "allowCheckExportjob", "Effect": "Allow", "Action": [ "dynamodb:DescribeExport" ], "Resource": [ "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/export/*" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator" ], "Resource": [ "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/stream/*" ] }, { "Sid": "allowReadAndWriteToS3ForExport", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": [ "arn:aws:s3:::my-bucket/{exportPath}/*" ] } ] }

Vous pouvez également utiliser une clé gérée par AWS KMS le client pour chiffrer les fichiers de données d'exportation. Pour déchiffrer les objets exportés, spécifiez s3_sse_kms_key_id l'ID de clé dans la configuration d'exportation du pipeline au format suivant :arn:aws:kms:us-west-2:{account-id}:key/my-key-id. La politique suivante inclut les autorisations requises pour utiliser une clé gérée par le client :

{ "Sid": "allowUseOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:GenerateDataKey", "kms:Decrypt" ], "Resource": arn:aws:kms:us-west-2:{account-id}:key/my-key-id }

Étape 2 : Création du pipeline

Vous pouvez ensuite configurer un pipeline d' OpenSearch ingestion comme le suivant, qui spécifie DynamoDB comme source. Cet exemple de pipeline ingère des données provenant de table-a l'PITRinstantané, suivies d'événements provenant de DynamoDB Streams. Une position de départ de LATEST indique que le pipeline doit lire les dernières données de DynamoDB Streams.

version: "2" cdc-pipeline: source: dynamodb: tables: - table_arn: "arn:aws:dynamodb:us-west-2:{account-id}:table/table-a" export: s3_bucket: "my-bucket" s3_prefix: "export/" stream: start_position: "LATEST" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"] index: "${getMetadata(\"table_name\")}" index_type: custom normalize_index: true document_id: "${getMetadata(\"primary_key\")}" action: "${getMetadata(\"opensearch_action\")}" document_version: "${getMetadata(\"document_version\")}" document_version_type: "external"

Vous pouvez utiliser un plan DynamoDB préconfiguré pour créer ce pipeline. Pour de plus amples informations, veuillez consulter Utiliser des plans pour créer un pipeline.

Cohérence des données

OpenSearch L'ingestion prend en charge end-to-end la reconnaissance afin de garantir la durabilité des données. Lorsqu'un pipeline lit des instantanés ou des flux, il crée dynamiquement des partitions pour un traitement parallèle. Le pipeline marque une partition comme terminée lorsqu'il reçoit un accusé de réception après avoir ingéré tous les enregistrements du OpenSearch domaine ou de la collection.

Si vous souhaitez intégrer des données dans une collection de recherche OpenSearch sans serveur, vous pouvez générer un identifiant de document dans le pipeline. Si vous souhaitez intégrer des données dans une collection de séries chronologiques OpenSearch sans serveur, notez que le pipeline ne génère pas d'identifiant de document.

Un pipeline d' OpenSearch ingestion fait également correspondre les actions des événements entrants aux actions d'indexation groupées correspondantes pour faciliter l'ingestion de documents. Cela garantit la cohérence des données, de sorte que chaque modification de données dans DynamoDB est conciliée avec les modifications de document correspondantes. OpenSearch

Types de données de mappage

OpenSearch Le service fait correspondre dynamiquement les types de données de chaque document entrant au type de données correspondant dans DynamoDB. Le tableau suivant montre comment OpenSearch Service mappe automatiquement les différents types de données.

Type de données OpenSearch DynamoDB
Nombre

OpenSearch mappe automatiquement les données numériques. S'il s'agit d'un nombre entier, OpenSearch mappez-le sous forme de valeur longue. Si le nombre est fractionnaire, il est OpenSearch mappé sous forme de valeur flottante.

OpenSearch mappe dynamiquement divers attributs en fonction du premier document envoyé. Si vous avez plusieurs types de données pour le même attribut dans DynamoDB, par exemple un nombre entier et un nombre fractionnaire, le mappage risque d'échouer.

Par exemple, si votre premier document possède un attribut qui est un nombre entier et qu'un document ultérieur possède le même attribut sous forme de nombre fractionnaire, le second document OpenSearch ne parvient pas à ingérer. Dans ces cas, vous devez fournir un modèle de mappage explicite, tel que le suivant :

{ "template": { "mappings": { "properties": { "MixedNumberAttribute": { "type": "float" } } } } }

Si vous avez besoin d'une double précision, utilisez le mappage de champs de type chaîne. Il n'existe aucun type numérique équivalent prenant en charge 38 chiffres de précision dans OpenSearch.

DynamoDB prend en charge les nombres.

Ensemble de numéros OpenSearch mappe automatiquement un ensemble de nombres dans un tableau de valeurs longues ou de valeurs flottantes. Comme pour les nombres scalaires, cela dépend du fait que le premier nombre ingéré est un nombre entier ou un nombre fractionnaire. Vous pouvez fournir des mappages pour des ensembles de nombres de la même manière que vous mappez des chaînes scalaires.

DynamoDB prend en charge les types qui représentent des ensembles de nombres.

Chaîne

OpenSearch mappe automatiquement les valeurs des chaînes sous forme de texte. Dans certaines situations, telles que les valeurs énumérées, vous pouvez mapper le type de mot clé.

L'exemple suivant montre comment mapper un attribut DynamoDB PartType nommé à un mot clé. OpenSearch

{ "template": { "mappings": { "properties": { "PartType": { "type": "keyword" } } } } }

DynamoDB prend en charge les chaînes de caractères.

Set de cordes

OpenSearch mappe automatiquement un ensemble de chaînes dans un tableau de chaînes. Vous pouvez fournir des mappages pour des ensembles de chaînes de la même manière que vous mappez des chaînes scalaires.

DynamoDB prend en charge les types qui représentent des ensembles de chaînes.
Binaire

OpenSearch mappe automatiquement les données binaires sous forme de texte. Vous pouvez fournir un mappage pour les écrire sous forme de champs binaires OpenSearch.

L'exemple suivant montre comment mapper un attribut DynamoDB ImageData nommé à OpenSearch un champ binaire.

{ "template": { "mappings": { "properties": { "ImageData": { "type": "binary" } } } } }
DynamoDB prend en charge les attributs de type binaire.
Ensemble binaire

OpenSearch mappe automatiquement un ensemble binaire dans un tableau de données binaires sous forme de texte. Vous pouvez fournir des mappages pour des ensembles de nombres de la même manière que vous mappez un binaire scalaire.

DynamoDB prend en charge les types qui représentent des ensembles de valeurs binaires.
Booléen

OpenSearch mappe un type booléen DynamoDB en un type booléen. OpenSearch

DynamoDB prend en charge les attributs de type booléen.

Null

OpenSearch peut ingérer des documents dont le type DynamoDB est nul. Elle enregistre la valeur sous forme de valeur nulle dans le document. Il n'existe aucun mappage pour ce type, et ce champ n'est ni indexé ni consultable.

Si le même nom d'attribut est utilisé pour un type nul, puis change ultérieurement pour un type différent, tel qu'une chaîne, OpenSearch crée un mappage dynamique pour la première valeur non nulle. Les valeurs suivantes peuvent toujours être des valeurs nulles DynamoDB.

DynamoDB prend en charge les attributs de type nul.
Map

OpenSearch mappe les attributs de mappage DynamoDB aux champs imbriqués. Les mêmes mappages s'appliquent dans un champ imbriqué.

L'exemple suivant fait correspondre une chaîne d'un champ imbriqué à un mot clé saisi dans OpenSearch :

{ "template": { "mappings": { "properties": { "AdditionalDescriptions": { "properties": { "PartType": { "type": "keyword" } } } } } } }
DynamoDB prend en charge les attributs de type de carte.
Liste

OpenSearch fournit des résultats différents pour les listes DynamoDB, en fonction du contenu de la liste.

Lorsqu'une liste contient tous le même type de types scalaires (par exemple, une liste de toutes les chaînes), elle OpenSearch ingère la liste sous forme de tableau de ce type. Cela fonctionne pour les types chaîne, numérique, booléen et nul. Les restrictions pour chacun de ces types sont les mêmes que celles pour un scalaire de ce type.

Vous pouvez également fournir des mappages pour des listes de cartes en utilisant le même mappage que celui que vous utiliseriez pour une carte.

Vous ne pouvez pas fournir de liste de types mixtes.

DynamoDB prend en charge les attributs de type liste.

Définir

OpenSearch fournit des résultats différents pour les ensembles DynamoDB en fonction du contenu de l'ensemble.

Lorsqu'un ensemble contient tous le même type de types scalaires (par exemple, un ensemble de toutes les chaînes), il OpenSearch ingère l'ensemble sous forme de tableau de ce type. Cela fonctionne pour les types chaîne, numérique, booléen et nul. Les restrictions pour chacun de ces types sont les mêmes que celles pour un scalaire de ce type.

Vous pouvez également fournir des mappages pour des ensembles de cartes en utilisant le même mappage que celui que vous utiliseriez pour une carte.

Vous ne pouvez pas fournir un ensemble de types mixtes.

DynamoDB prend en charge les types qui représentent des ensembles.

Nous vous recommandons de configurer la file d'attente des lettres mortes (DLQ) dans votre pipeline d' OpenSearch ingestion. Si vous avez configuré la file d'attente, le OpenSearch service envoie tous les documents défaillants qui ne peuvent pas être ingérés en raison d'échecs de mappage dynamique vers la file d'attente.

En cas d'échec des mappages automatiques, vous pouvez utiliser template_type et template_content dans la configuration de votre pipeline pour définir des règles de mappage explicites. Vous pouvez également créer des modèles de mappage directement dans votre domaine de recherche ou votre collection avant de démarrer le pipeline.

Limites

Tenez compte des limites suivantes lorsque vous configurez un pipeline d' OpenSearch ingestion pour DynamoDB :

  • L'intégration d' OpenSearch ingestion avec DynamoDB ne prend actuellement pas en charge l'ingestion entre régions. Votre table DynamoDB OpenSearch et votre pipeline d'ingestion doivent être identiques. Région AWS

  • Votre table DynamoDB OpenSearch et votre pipeline d'ingestion doivent être identiques. Compte AWS

  • Un pipeline d' OpenSearch ingestion ne prend en charge qu'une seule table DynamoDB comme source.

  • DynamoDB Streams ne stocke les données dans un journal que pendant 24 heures maximum. Si l'ingestion à partir d'un instantané initial d'une grande table prend 24 heures ou plus, il y aura une perte de données initiale. Pour atténuer cette perte de données, estimez la taille de la table et configurez les unités de calcul appropriées pour les pipelines d' OpenSearch ingestion.

CloudWatch Alarmes recommandées pour DynamoDB

Les CloudWatch mesures suivantes sont recommandées pour surveiller les performances de votre pipeline d'ingestion. Ces indicateurs peuvent vous aider à identifier la quantité de données traitées à partir des exportations, le nombre d'événements traités à partir des flux, les erreurs lors du traitement des exportations et des événements des flux, ainsi que le nombre de documents écrits vers la destination. Vous pouvez configurer des CloudWatch alarmes pour effectuer une action lorsque l'une de ces mesures dépasse une valeur spécifiée pendant une durée spécifiée.

Métrique Description
dynamodb-pipeline.BlockingBuffer.bufferUsage.value

Indique la quantité de mémoire tampon utilisée.

dynamodb-pipeline.dynamodb.activeExportS3ObjectConsumers.value

Indique le nombre total de OCUs ceux qui traitent activement des objets Amazon S3 pour l'exportation.

dynamodb-pipeline.dynamodb.bytesProcessed.count

Nombre d'octets traités à partir de la source DynamoDB.

dynamodb-pipeline.dynamodb.changeEventsProcessed.count

Nombre d'événements de modification traités à partir du flux DynamoDB.

dynamodb-pipeline.dynamodb.changeEventsProcessingErrors.count

Nombre d'erreurs liées aux événements de modification traités par DynamoDB.

dynamodb-pipeline.dynamodb.exportJobFailure.count Nombre de tentatives de soumission de tâches d'exportation qui ont échoué.
dynamodb-pipeline.dynamodb.exportJobSuccess.count Nombre de tâches d'exportation soumises avec succès.
dynamodb-pipeline.dynamodb.exportRecordsProcessed.count

Nombre total d'enregistrements traités à partir de l'exportation.

dynamodb-pipeline.dynamodb.exportRecordsTotal.count

Nombre total d'enregistrements exportés depuis DynamoDB, essentiel pour le suivi des volumes d'exportation de données.

dynamodb-pipeline.dynamodb.exportS3ObjectsProcessed.count Nombre total de fichiers de données d'exportation qui ont été traités avec succès depuis Amazon S3.
dynamodb-pipeline.opensearch.bulkBadRequestErrors.count Nombre d'erreurs lors de demandes groupées dues à une demande mal formée.
dynamodb-pipeline.opensearch.bulkRequestLatency.avg Latence moyenne pour les demandes d'écriture en masse adressées à OpenSearch.
dynamodb-pipeline.opensearch.bulkRequestNotFoundErrors.count Nombre de demandes groupées qui ont échoué car les données cibles sont introuvables.
dynamodb-pipeline.opensearch.bulkRequestNumberOfRetries.count Nombre de tentatives effectuées par les pipelines OpenSearch d'ingestion pour écrire un OpenSearch cluster.
dynamodb-pipeline.opensearch.bulkRequestSizeBytes.sum Taille totale en octets de toutes les demandes groupées adressées à OpenSearch.
dynamodb-pipeline.opensearch.documentErrors.count Nombre d'erreurs lors de l'envoi de documents à OpenSearch. Les documents à l'origine des erreurs seront envoyés àDLQ.
dynamodb-pipeline.opensearch.documentsSuccess.count Nombre de documents écrits avec succès dans un OpenSearch cluster ou une collection.
dynamodb-pipeline.opensearch.documentsSuccessFirstAttempt.count Nombre de documents indexés avec succès OpenSearch lors de la première tentative.

dynamodb-pipeline.opensearch.documentsVersionConflictErrors.count

Nombre d'erreurs dues à des conflits de versions dans les documents pendant le traitement.

dynamodb-pipeline.opensearch.PipelineLatency.avg

Latence moyenne du pipeline d' OpenSearch ingestion pour traiter les données en lisant depuis la source et en écrivant vers la destination.
dynamodb-pipeline.opensearch.PipelineLatency.max Latence maximale du pipeline d' OpenSearch ingestion pour traiter les données en lisant depuis la source jusqu'à l'écriture de la destination.
dynamodb-pipeline.opensearch.recordsIn.count Nombre d'enregistrements ingérés avec succès. OpenSearch Cette métrique est essentielle pour suivre le volume de données traitées et stockées.
dynamodb-pipeline.opensearch.s3.dlqS3RecordsFailed.count Nombre d'enregistrements dans lesquels il n'a pas été possible d'écrireDLQ.
dynamodb-pipeline.opensearch.s3.dlqS3RecordsSuccess.count Nombre d'enregistrements sur lesquels on écritDLQ.
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.count Nombre de mesures de latence pour les demandes adressées à la file d'attente des lettres mortes Amazon S3.
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.sum Latence totale pour toutes les demandes adressées à la file d'attente des lettres mortes Amazon S3
dynamodb-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum Taille totale en octets de toutes les demandes envoyées à la file d'attente de lettres mortes Amazon S3.
dynamodb-pipeline.recordsProcessed.count Nombre total d'enregistrements traités dans le pipeline, indicateur clé du débit global.
dynamodb.changeEventsProcessed.count Aucun enregistrement n'est collecté à partir des flux DynamoDB. Cela peut être dû à l'absence d'activité sur la table, à une exportation en cours ou à un problème d'accès aux flux DynamoDB.

dynamodb.exportJobFailure.count

La tentative de déclenchement d'une exportation vers S3 a échoué.

line.opensearch.bulkRequestInvalidInputErrors.count

Nombre d'erreurs liées aux demandes groupées OpenSearch dues à une saisie non valide, crucial pour le suivi de la qualité des données et des problèmes opérationnels.
opensearch.EndToEndLatency.avg La latence de bout en bout est supérieure à celle souhaitée pour la lecture à partir de flux DynamoDB. Cela peut être dû à un OpenSearch cluster sous-dimensionné ou à une OCU capacité de pipeline maximale trop faible pour le WCU débit de la table DynamoDB. Cette latence de bout en bout sera élevée après une exportation et devrait diminuer au fil du temps à mesure qu'elle rattrape les derniers flux DynamoDB.