Utilisation d'un pipeline OpenSearch d'ingestion avec Amazon S3 - Amazon OpenSearch Service

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation d'un pipeline OpenSearch d'ingestion avec Amazon S3

Avec OpenSearch Ingestion, vous pouvez utiliser Amazon S3 comme source ou comme destination. Lorsque vous utilisez Amazon S3 comme source, vous envoyez des données vers un pipeline d' OpenSearch ingestion. Lorsque vous utilisez Amazon S3 comme destination, vous écrivez les données d'un pipeline d' OpenSearch ingestion dans un ou plusieurs compartiments S3.

Amazon S3 en tant que source

Vous pouvez utiliser Amazon S3 comme source pour traiter les données de deux manières : avec le SQStraitement S3 et avec les scans planifiés.

Utilisez le SQS traitement S3 lorsque vous avez besoin d'une analyse en temps quasi réel des fichiers une fois qu'ils ont été écrits dans S3. Vous pouvez configurer les compartiments Amazon S3 pour déclencher un événement chaque fois qu'un objet est stocké ou modifié dans le compartiment. Utilisez une analyse planifiée ponctuelle ou récurrente pour traiter par lots les données d'un compartiment S3.

Prérequis

Pour utiliser Amazon S3 comme source d'un pipeline d' OpenSearch ingestion à la fois pour un scan planifié ou un SQS traitement S3, créez d'abord un compartiment S3.

Note

Si le compartiment S3 utilisé comme source dans le pipeline d' OpenSearch ingestion se trouve dans un autre compartiment Compte AWS, vous devez également activer les autorisations de lecture entre comptes sur le compartiment. Cela permet au pipeline de lire et de traiter les données. Pour activer les autorisations entre comptes, consultez la section Octroi par le propriétaire du bucket des autorisations de bucket entre comptes dans le guide de l'utilisateur Amazon S3.

Si vos compartiments S3 se trouvent dans plusieurs comptes, utilisez une bucket_owners carte. Pour un exemple, consultez la section Accès S3 entre comptes dans la OpenSearch documentation.

Pour configurer le SQS traitement S3, vous devez également effectuer les étapes suivantes :

  1. Créez une SQS file d'attente Amazon.

  2. Activez les notifications d'événements sur le compartiment S3 avec la SQS file d'attente comme destination.

Étape 1 : configurer le rôle du pipeline

Contrairement aux autres plugins source qui envoient des données vers un pipeline, le plug-in source S3 possède une architecture basée sur la lecture dans laquelle le pipeline extrait les données de la source.

Par conséquent, pour qu'un pipeline puisse lire depuis S3, vous devez spécifier un rôle dans la configuration source S3 du pipeline qui a accès à la fois au compartiment S3 et à la SQS file d'attente Amazon. Le pipeline assumera ce rôle afin de lire les données de la file d'attente.

Note

Le rôle que vous spécifiez dans la configuration source S3 doit être le rôle de pipeline. Par conséquent, votre rôle de pipeline doit contenir deux politiques d'autorisation distinctes : l'une pour écrire dans un récepteur et l'autre pour extraire de la source S3. Vous devez utiliser le même principe sts_role_arn dans tous les composants du pipeline.

L'exemple de politique suivant indique les autorisations requises pour utiliser S3 en tant que source :

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::my-bucket/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:{account-id}:MyS3EventSqsQueue" } ] }

Vous devez associer ces autorisations au IAM rôle que vous spécifiez dans l'sts_role_arnoption de configuration du plugin source S3 :

version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::{account-id}:role/pipeline-role processor: ... sink: - opensearch: ...

Étape 2 : Création du pipeline

Après avoir configuré vos autorisations, vous pouvez configurer un pipeline d' OpenSearch ingestion en fonction de votre cas d'utilisation d'Amazon S3.

SQSTraitement S3-

Pour configurer le SQS traitement S3, configurez votre pipeline pour spécifier S3 comme source et configurez SQS les notifications Amazon :

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1.amazonaws.com/{account-id}/ingestion-queue" compression: "none" aws: region: "us-east-1" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1"

Si vous observez une faible CPU utilisation lors du traitement de petits fichiers sur Amazon S3, envisagez d'augmenter le débit en modifiant la valeur de l'workersoption. Pour plus d'informations, consultez les options de configuration du plugin S3.

Scan programmé

Pour configurer une analyse planifiée, configurez votre pipeline avec une planification au niveau de l'analyse qui s'applique à tous vos compartiments S3, ou au niveau du compartiment. Une planification au niveau du compartiment ou une configuration à intervalles de numérisation remplace toujours une configuration au niveau du scan.

Vous pouvez configurer des scans planifiés avec un scan unique, idéal pour la migration des données, ou un scan récurrent, idéal pour le traitement par lots.

Pour configurer votre pipeline afin qu'il puisse lire depuis Amazon S3, utilisez les plans Amazon S3 préconfigurés. Vous pouvez modifier la scan partie de la configuration de votre pipeline pour répondre à vos besoins de planification. Pour de plus amples informations, veuillez consulter Utiliser des plans pour créer un pipeline.

Scan unique

Un scan programmé ne s'exécute qu'une seule fois. Dans votre YAML configuration, vous pouvez utiliser un start_time et end_time pour spécifier à quel moment vous souhaitez que les objets du compartiment soient scannés. Vous pouvez également l'utiliser range pour spécifier l'intervalle de temps par rapport à l'heure actuelle pendant laquelle vous souhaitez que les objets du compartiment soient scannés.

Par exemple, une plage définie pour analyser PT4H tous les fichiers créés au cours des quatre dernières heures. Pour configurer une analyse unique afin qu'elle soit exécutée une deuxième fois, vous devez arrêter et redémarrer le pipeline. Si aucune plage n'est configurée, vous devez également mettre à jour les heures de début et de fin.

La configuration suivante permet de configurer une analyse unique de tous les compartiments et de tous les objets qu'ils contiennent :

version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" acknowledgments: true scan: buckets: - bucket: name: my-bucket-1 filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1" dlq: s3: bucket: "my-bucket-1" region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

La configuration suivante met en place une analyse unique de tous les compartiments pendant une période spécifiée. Cela signifie que S3 traite uniquement les objets dont l'heure de création se situe dans cette fenêtre.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

La configuration suivante configure une analyse unique à la fois au niveau du scan et au niveau du bucket. Les heures de début et de fin au niveau du bucket remplacent les heures de début et de fin au niveau du scan.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

L'arrêt d'un pipeline supprime toute référence préexistante aux objets scannés par le pipeline avant l'arrêt. Si un seul pipeline de numérisation est arrêté, il scanne à nouveau tous les objets après son démarrage, même s'ils ont déjà été numérisés. Si vous devez arrêter un seul pipeline de scan, il est recommandé de modifier votre créneau horaire avant de recommencer le pipeline.

Si vous devez filtrer les objets par heure de début et de fin, l'arrêt et le démarrage de votre pipeline sont la seule option. S'il n'est pas nécessaire de filtrer par heure de début et heure de fin, vous pouvez filtrer les objets par nom. Filtrer par nom ne vous oblige pas à arrêter et à démarrer votre pipeline. Pour ce faire, utilisez include_prefix etexclude_suffix.

Scan récurrent

Une analyse planifiée récurrente exécute une analyse des compartiments S3 que vous avez spécifiés à intervalles réguliers et planifiés. Vous ne pouvez configurer ces intervalles qu'au niveau du scan, car les configurations individuelles au niveau du bucket ne sont pas prises en charge.

Dans votre YAML configuration, interval la fréquence du scan récurrent peut être comprise entre 30 secondes et 365 jours. Le premier de ces scans a toujours lieu lorsque vous créez le pipeline. countDéfinit le nombre total d'instances de scan.

La configuration suivante permet de configurer un scan récurrent, avec un délai de 12 heures entre les scans :

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Amazon S3 en tant que destination

Pour écrire des données d'un pipeline d' OpenSearch ingestion dans un compartiment S3, utilisez le plan S3 préconfiguré pour créer un pipeline avec un récepteur S3. Ce pipeline achemine des données sélectives vers un OpenSearch récepteur et envoie simultanément toutes les données pour archivage dans S3. Pour de plus amples informations, veuillez consulter Utiliser des plans pour créer un pipeline.

Lorsque vous créez votre récepteur S3, vous pouvez spécifier votre formatage préféré à partir de différents codecs récepteurs. Par exemple, si vous souhaitez écrire des données sous forme de colonnes, choisissez le codec Parquet ou Avro. Si vous préférez un format basé sur des lignes, choisissez JSON ND-JSON. Pour écrire des données dans S3 dans un schéma spécifique, vous pouvez également définir un schéma en ligne dans les codecs récepteurs à l'aide du format Avro.

L'exemple suivant définit un schéma en ligne dans un récepteur S3 :

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }

Lorsque vous définissez ce schéma, spécifiez un sur-ensemble de toutes les clés susceptibles d'être présentes dans les différents types d'événements que votre pipeline envoie à un récepteur.

Par exemple, s'il est possible qu'une clé soit manquante lors d'un événement, ajoutez cette clé dans votre schéma avec une null valeur. Les déclarations de valeur nulle permettent au schéma de traiter des données non uniformes (certains événements possèdent ces clés, d'autres non). Lorsque ces clés sont présentes dans des événements entrants, leurs valeurs sont écrites dans des récepteurs.

Cette définition de schéma agit comme un filtre qui permet uniquement d'envoyer des clés définies aux récepteurs et supprime les clés non définies des événements entrants.

Vous pouvez également utiliser include_keys et exclude_keys dans votre récepteur pour filtrer les données acheminées vers d'autres récepteurs. Ces deux filtres s'excluent mutuellement, vous ne pouvez donc en utiliser qu'un à la fois dans votre schéma. En outre, vous ne pouvez pas les utiliser dans des schémas définis par l'utilisateur.

Pour créer des pipelines avec de tels filtres, utilisez le plan de filtre récepteur préconfiguré. Pour de plus amples informations, veuillez consulter Utiliser des plans pour créer un pipeline.

Compte croisé Amazon S3 en tant que source

Vous pouvez accorder l'accès à plusieurs comptes avec Amazon S3 afin que les pipelines OpenSearch d'ingestion puissent accéder aux compartiments S3 d'un autre compte en tant que source. Pour activer l'accès entre comptes, consultez la section Octroi par le propriétaire du bucket des autorisations de bucket entre comptes dans le guide de l'utilisateur Amazon S3. Une fois que vous avez accordé l'accès, assurez-vous que votre rôle de pipeline dispose des autorisations requises.

Vous pouvez ensuite créer une YAML configuration en utilisant bucket_owners pour activer l'accès entre comptes à un compartiment Amazon S3 en tant que source :

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"