Utilizzo di una pipeline OpenSearch di ingestione con Amazon S3 - OpenSearch Servizio Amazon

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo di una pipeline OpenSearch di ingestione con Amazon S3

Con OpenSearch Ingestion, puoi usare Amazon S3 come origine o come destinazione. Quando usi Amazon S3 come fonte, invii dati a una pipeline di OpenSearch ingestione. Quando usi Amazon S3 come destinazione, scrivi dati da una pipeline di OpenSearch ingestione su uno o più bucket S3.

Amazon S3 come fonte

Esistono due modi per utilizzare Amazon S3 come origine per elaborare i dati: con l'elaborazione S3 e con le scansioni pianificate. SQS

Utilizza l'SQSelaborazione S3- quando è necessaria una scansione quasi in tempo reale dei file dopo la loro scrittura su S3. Puoi configurare i bucket Amazon S3 per generare un evento ogni volta che un oggetto viene archiviato o modificato all'interno del bucket. Utilizza una scansione pianificata una tantum o ricorrente per elaborare in batch i dati in un bucket S3.

Prerequisiti

Per utilizzare Amazon S3 come origine per una pipeline di OpenSearch ingestion sia per una scansione pianificata che per l'SQSelaborazione S3, devi prima creare un bucket S3.

Nota

Se il bucket S3 utilizzato come origine nella pipeline di OpenSearch Ingestion si trova in un altro Account AWS, devi anche abilitare le autorizzazioni di lettura tra account sul bucket. Ciò consente alla pipeline di leggere ed elaborare i dati. Per abilitare le autorizzazioni per più account, consulta la sezione relativa alla concessione delle autorizzazioni per i bucket tra più account nella Guida per l'utente di Amazon S3.

Se i tuoi bucket S3 si trovano in più account, usa una mappa. bucket_owners Per un esempio, vedi Cross-account S3 access nella documentazione. OpenSearch

Per configurare l'SQSelaborazione S3, devi anche eseguire le seguenti operazioni:

  1. Crea una SQS coda Amazon.

  2. Abilita le notifiche degli eventi sul bucket S3 con la SQS coda come destinazione.

Fase 1: Configurare il ruolo della pipeline

A differenza di altri plugin di origine che inviano dati a una pipeline, il plug-in di origine S3 ha un'architettura basata sulla lettura in cui la pipeline estrae i dati dalla sorgente.

Pertanto, affinché una pipeline possa leggere da S3, è necessario specificare un ruolo all'interno della configurazione di origine S3 della pipeline che abbia accesso sia al bucket S3 che alla coda Amazon. SQS La pipeline assumerà questo ruolo per leggere i dati dalla coda.

Nota

Il ruolo specificato nella configurazione di origine di S3 deve essere il ruolo della pipeline. Pertanto, il ruolo della pipeline deve contenere due policy di autorizzazione separate: una per la scrittura su un sink e l'altra da estrarre dal codice sorgente S3. È necessario utilizzare lo stesso in tutti i componenti della pipeline. sts_role_arn

La seguente politica di esempio mostra le autorizzazioni necessarie per l'utilizzo di S3 come fonte:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::my-bucket/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:{account-id}:MyS3EventSqsQueue" } ] }

È necessario allegare queste autorizzazioni al IAM ruolo specificato nell'sts_role_arnopzione all'interno della configurazione del plug-in di origine S3:

version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::{account-id}:role/pipeline-role processor: ... sink: - opensearch: ...

Fase 2: Creare la pipeline

Dopo aver impostato le autorizzazioni, puoi configurare una pipeline di OpenSearch ingestione in base al tuo caso d'uso di Amazon S3.

Elaborazione S3 SQS

Per configurare l'SQSelaborazione S3, configura la pipeline per specificare S3 come origine e configura le notifiche Amazon: SQS

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1.amazonaws.com/{account-id}/ingestion-queue" compression: "none" aws: region: "us-east-1" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1"

Se riscontri un basso CPU utilizzo durante l'elaborazione di file di piccole dimensioni su Amazon S3, valuta la possibilità di aumentare la velocità effettiva modificando il valore dell'opzione. workers Per ulteriori informazioni, consulta le opzioni di configurazione del plug-in S3.

Scansione pianificata

Per configurare una scansione pianificata, configura la pipeline con una pianificazione a livello di scansione che si applica a tutti i bucket S3 o a livello di bucket. Una pianificazione a livello di bucket o una configurazione a intervalli di scansione sovrascrive sempre una configurazione a livello di scansione.

È possibile configurare le scansioni pianificate con una scansione singola, ideale per la migrazione dei dati, o una scansione ricorrente, ideale per l'elaborazione in batch.

Per configurare la pipeline in modo che legga da Amazon S3, utilizza i blueprint Amazon S3 preconfigurati. Puoi modificare la scan parte della configurazione della pipeline per soddisfare le tue esigenze di pianificazione. Per ulteriori informazioni, consulta Utilizzo dei blueprint per creare una pipeline.

Scansione una tantum

Una scansione pianificata una tantum viene eseguita una sola volta. Nella YAML configurazione, è possibile utilizzare un start_time e end_time per specificare quando si desidera che gli oggetti nel bucket vengano scansionati. In alternativa, è possibile utilizzare range per specificare l'intervallo di tempo relativo all'ora corrente in cui si desidera che gli oggetti nel bucket vengano scansionati.

Ad esempio, un intervallo impostato per PT4H scansionare tutti i file creati nelle ultime quattro ore. Per configurare una scansione singola da eseguire una seconda volta, è necessario arrestare e riavviare la pipeline. Se non hai configurato un intervallo, devi anche aggiornare l'ora di inizio e di fine.

La seguente configurazione imposta una scansione unica per tutti i bucket e tutti gli oggetti in tali bucket:

version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" acknowledgments: true scan: buckets: - bucket: name: my-bucket-1 filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1" dlq: s3: bucket: "my-bucket-1" region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

La seguente configurazione imposta una scansione unica per tutti i bucket durante una finestra temporale specificata. Ciò significa che S3 elabora solo gli oggetti con tempi di creazione che rientrano in questa finestra.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

La seguente configurazione imposta una scansione una tantum sia a livello di scansione che a livello di bucket. Gli orari di inizio e fine a livello di bucket sostituiscono gli orari di inizio e fine a livello di scansione.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

L'arresto di una pipeline rimuove qualsiasi riferimento preesistente a quali oggetti sono stati scansionati dalla pipeline prima dell'arresto. Se una singola pipeline di scansione viene interrotta, eseguirà nuovamente la scansione di tutti gli oggetti dopo l'avvio, anche se erano già stati scansionati. Se è necessario interrompere una singola pipeline di scansione, si consiglia di modificare la finestra temporale prima di riavviare la pipeline.

Se è necessario filtrare gli oggetti in base all'ora di inizio e all'ora di fine, l'unica opzione è interrompere e avviare la pipeline. Se non è necessario filtrare per ora di inizio e ora di fine, è possibile filtrare gli oggetti per nome. Il filtraggio per nome non richiede di interrompere e avviare la pipeline. Per fare ciò, usa e. include_prefix exclude_suffix

Scansione ricorrente

Una scansione pianificata ricorrente esegue una scansione dei bucket S3 specificati a intervalli regolari e pianificati. Puoi configurare questi intervalli solo a livello di scansione perché le configurazioni a livello di bucket individuali non sono supportate.

Nella YAML configurazione, interval specifica la frequenza della scansione ricorrente e può essere compresa tra 30 secondi e 365 giorni. La prima di queste scansioni si verifica sempre quando si crea la pipeline. countDefinisce il numero totale di istanze di scansione.

La seguente configurazione imposta una scansione ricorrente, con un ritardo di 12 ore tra le scansioni:

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Amazon S3 come destinazione

Per scrivere dati da una pipeline di OpenSearch ingestione a un bucket S3, usa il blueprint S3 preconfigurato per creare una pipeline con un sink S3. Questa pipeline indirizza i dati selettivi verso un sink e invia simultaneamente tutti i dati per l'archiviazione in S3. OpenSearch Per ulteriori informazioni, consulta Utilizzo dei blueprint per creare una pipeline.

Quando crei il tuo sink S3, puoi specificare la formattazione preferita tra una varietà di codec sink. Ad esempio, se desideri scrivere dati in formato colonnare, scegli il codec Parquet o Avro. Se preferite un formato basato su righe, scegliete o ND-. JSON JSON Per scrivere dati su S3 in uno schema specifico, puoi anche definire uno schema in linea all'interno dei codec sink usando il formato Avro.

L'esempio seguente definisce uno schema in linea in un sink S3:

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }

Quando definisci questo schema, specifica un superset di tutte le chiavi che potrebbero essere presenti nei diversi tipi di eventi che la pipeline invia a un sink.

Ad esempio, se in un evento è possibile che manchi una chiave, aggiungete quella chiave allo schema con un null valore. Le dichiarazioni di valori nulli consentono allo schema di elaborare dati non uniformi (laddove alcuni eventi abbiano queste chiavi e altri no). Quando negli eventi in entrata sono presenti queste chiavi, i relativi valori vengono scritti nei sink.

Questa definizione dello schema funge da filtro che consente solo l'invio di chiavi definite ai sink e rimuove le chiavi non definite dagli eventi in arrivo.

Puoi anche utilizzare include_keys e exclude_keys nel tuo sink per filtrare i dati che vengono indirizzati ad altri sink. Questi due filtri si escludono a vicenda, quindi puoi utilizzarne solo uno alla volta nello schema. Inoltre, non è possibile utilizzarli all'interno di schemi definiti dall'utente.

Per creare pipeline con tali filtri, utilizza il blueprint del filtro sink preconfigurato. Per ulteriori informazioni, consulta Utilizzo dei blueprint per creare una pipeline.

Account multiplo Amazon S3 come fonte

Puoi concedere l'accesso a più account con Amazon S3 in modo che le pipeline di OpenSearch Ingestion possano accedere ai bucket S3 in un altro account come fonte. Per abilitare l'accesso a più account, consulta la sezione relativa alla concessione delle autorizzazioni per i bucket tra più account nella Amazon S3 User Guide. Dopo aver concesso l'accesso, assicurati che il tuo ruolo di pipeline disponga delle autorizzazioni richieste.

Quindi, puoi creare una YAML configurazione utilizzando bucket_owners per abilitare l'accesso tra account a un bucket Amazon S3 come origine:

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"