Verwenden einer OpenSearch Ingestion-Pipeline mit Amazon S3 - OpenSearch Amazon-Dienst

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden einer OpenSearch Ingestion-Pipeline mit Amazon S3

Mit OpenSearch Ingestion können Sie Amazon S3 als Quelle oder als Ziel verwenden. Wenn Sie Amazon S3 als Quelle verwenden, senden Sie Daten an eine OpenSearch Ingestion-Pipeline. Wenn Sie Amazon S3 als Ziel verwenden, schreiben Sie Daten aus einer OpenSearch Ingestion-Pipeline in einen oder mehrere S3-Buckets.

Amazon S3 als Quelle

Es gibt zwei Möglichkeiten, Amazon S3 als Quelle für die Datenverarbeitung zu verwenden: mit SQSS3-Verarbeitung und mit geplanten Scans.

Verwenden Sie die SQS S3-Verarbeitung, wenn Sie Dateien fast in Echtzeit scannen möchten, nachdem sie in S3 geschrieben wurden. Sie können Amazon S3 S3-Buckets so konfigurieren, dass sie jedes Mal ein Ereignis auslösen, wenn ein Objekt im Bucket gespeichert oder geändert wird. Verwenden Sie einen einmaligen oder wiederkehrenden geplanten Scan, um Daten in einem S3-Bucket stapelweise zu verarbeiten.

Voraussetzungen

Um Amazon S3 als Quelle für eine OpenSearch Ingestion-Pipeline sowohl für einen geplanten Scan als auch für die SQS S3-Verarbeitung zu verwenden, erstellen Sie zunächst einen S3-Bucket.

Anmerkung

Wenn sich der S3-Bucket, der als Quelle in der OpenSearch Ingestion-Pipeline verwendet wird, in einem anderen befindet AWS-Konto, müssen Sie auch kontoübergreifende Leseberechtigungen für den Bucket aktivieren. Dadurch kann die Pipeline die Daten lesen und verarbeiten. Informationen zum Aktivieren kontoübergreifender Berechtigungen finden Sie unter Bucket-Besitzer, der kontoübergreifende Bucket-Berechtigungen erteilt im Amazon S3 S3-Benutzerhandbuch.

Wenn sich Ihre S3-Buckets in mehreren Konten befinden, verwenden Sie eine Map. bucket_owners Ein Beispiel finden Sie in der Dokumentation unter Kontoübergreifender S3-Zugriff. OpenSearch

Um die SQS S3-Verarbeitung einzurichten, müssen Sie außerdem die folgenden Schritte ausführen:

Schritt 1: Konfigurieren Sie die Pipeline-Rolle

Im Gegensatz zu anderen Quell-Plugins, die Daten in eine Pipeline übertragen, verfügt das S3-Quell-Plug-In über eine lesebasierte Architektur, bei der die Pipeline Daten aus der Quelle bezieht.

Damit eine Pipeline aus S3 lesen kann, müssen Sie daher eine Rolle in der S3-Quellkonfiguration der Pipeline angeben, die Zugriff sowohl auf den S3-Bucket als auch auf die SQS Amazon-Warteschlange hat. Die Pipeline übernimmt diese Rolle, um Daten aus der Warteschlange zu lesen.

Anmerkung

Die Rolle, die Sie in der S3-Quellkonfiguration angeben, muss die Pipeline-Rolle sein. Daher muss Ihre Pipeline-Rolle zwei separate Berechtigungsrichtlinien enthalten — eine zum Schreiben in eine Senke und eine zum Abrufen aus der S3-Quelle. Sie müssen dasselbe sts_role_arn in allen Pipeline-Komponenten verwenden.

Die folgende Beispielrichtlinie zeigt die erforderlichen Berechtigungen für die Verwendung von S3 als Quelle:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::my-bucket/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:{account-id}:MyS3EventSqsQueue" } ] }

Sie müssen diese Berechtigungen an die IAM Rolle anhängen, die Sie in der sts_role_arn Option in der Konfiguration des S3-Quell-Plug-ins angeben:

version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::{account-id}:role/pipeline-role processor: ... sink: - opensearch: ...

Schritt 2: Erstellen Sie die Pipeline

Nachdem Sie Ihre Berechtigungen eingerichtet haben, können Sie je nach Ihrem Amazon OpenSearch S3-Anwendungsfall eine Ingestion-Pipeline konfigurieren.

S3-Verarbeitung SQS

Um die SQS S3-Verarbeitung einzurichten, konfigurieren Sie Ihre Pipeline so, dass sie S3 als Quelle angibt, und richten Sie SQS Amazon-Benachrichtigungen ein:

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1.amazonaws.com/{account-id}/ingestion-queue" compression: "none" aws: region: "us-east-1" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1"

Wenn Sie bei der Verarbeitung kleiner Dateien auf Amazon S3 eine geringe CPU Auslastung feststellen, sollten Sie erwägen, den Durchsatz zu erhöhen, indem Sie den Wert der workers Option ändern. Weitere Informationen finden Sie in den Konfigurationsoptionen des S3-Plug-ins.

Geplanter Scan

Um einen geplanten Scan einzurichten, konfigurieren Sie Ihre Pipeline mit einem Zeitplan auf Scanebene, der für alle Ihre S3-Buckets gilt, oder auf Bucket-Ebene. Ein Zeitplan auf Bucket-Ebene oder eine Konfiguration mit Scan-Intervallen überschreibt immer eine Konfiguration auf Scan-Ebene.

Sie können geplante Scans entweder mit einem einmaligen Scan konfigurieren, der sich ideal für die Datenmigration eignet, oder mit einem wiederkehrenden Scan, der sich ideal für die Stapelverarbeitung eignet.

Verwenden Sie die vorkonfigurierten Amazon S3-Blueprints, um Ihre Pipeline für das Lesen aus Amazon S3 zu konfigurieren. Sie können den scan Teil Ihrer Pipeline-Konfiguration bearbeiten, um Ihre Planungsanforderungen zu erfüllen. Weitere Informationen finden Sie unter Verwenden von Blueprints zum Erstellen einer Pipeline.

Einmaliger Scan

Ein einmaliger geplanter Scan wird einmal ausgeführt. In Ihrer YAML Konfiguration können Sie mit einem start_time und angebenend_time, wann die Objekte im Bucket gescannt werden sollen. Alternativ können range Sie das Zeitintervall im Verhältnis zur aktuellen Uhrzeit angeben, in dem die Objekte im Bucket gescannt werden sollen.

Zum Beispiel ein Bereich, der so eingestellt ist, dass alle Dateien PT4H gescannt werden, die in den letzten vier Stunden erstellt wurden. Um einen einmaligen Scan so zu konfigurieren, dass er ein zweites Mal ausgeführt wird, müssen Sie die Pipeline beenden und neu starten. Wenn Sie keinen Bereich konfiguriert haben, müssen Sie auch die Start- und Endzeiten aktualisieren.

Die folgende Konfiguration richtet einen einmaligen Scan für alle Buckets und alle Objekte in diesen Buckets ein:

version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" acknowledgments: true scan: buckets: - bucket: name: my-bucket-1 filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1" dlq: s3: bucket: "my-bucket-1" region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

Die folgende Konfiguration richtet einen einmaligen Scan für alle Buckets während eines bestimmten Zeitfensters ein. Das bedeutet, dass S3 nur die Objekte verarbeitet, deren Erstellungszeiten in dieses Fenster fallen.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Die folgende Konfiguration richtet einen einmaligen Scan sowohl auf Scan- als auch auf Bucket-Ebene ein. Start- und Endzeiten auf Bucket-Ebene haben Vorrang vor Start- und Endzeiten auf Scan-Ebene.

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Beim Stoppen einer Pipeline werden alle bereits vorhandenen Verweise darauf entfernt, welche Objekte vor dem Stopp von der Pipeline gescannt wurden. Wenn eine einzelne Scan-Pipeline gestoppt wird, werden alle Objekte nach dem Start erneut gescannt, auch wenn sie bereits gescannt wurden. Wenn Sie eine einzelne Scan-Pipeline beenden müssen, empfiehlt es sich, Ihr Zeitfenster zu ändern, bevor Sie die Pipeline erneut starten.

Wenn Sie Objekte nach Start- und Endzeit filtern müssen, ist das Stoppen und Starten der Pipeline die einzige Option. Wenn Sie nicht nach Start- und Endzeit filtern müssen, können Sie Objekte nach Namen filtern. Um nach Namen zu filtern, müssen Sie Ihre Pipeline nicht beenden und starten. Verwenden include_prefix Sie dazu und. exclude_suffix

Wiederkehrender Scan

Bei einem wiederkehrenden geplanten Scan werden Ihre angegebenen S3-Buckets in regelmäßigen, geplanten Intervallen gescannt. Sie können diese Intervalle nur auf Scanebene konfigurieren, da einzelne Konfigurationen auf Bucket-Ebene nicht unterstützt werden.

In Ihrer YAML Konfiguration interval gibt der die Häufigkeit des wiederkehrenden Scans an und kann zwischen 30 Sekunden und 365 Tagen liegen. Der erste dieser Scans erfolgt immer, wenn Sie die Pipeline erstellen. Das count definiert die Gesamtzahl der Scan-Instanzen.

Die folgende Konfiguration richtet einen wiederkehrenden Scan mit einer Verzögerung von 12 Stunden zwischen den Scans ein:

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Amazon S3 als Ziel

Um Daten aus einer OpenSearch Ingestion-Pipeline in einen S3-Bucket zu schreiben, verwenden Sie den vorkonfigurierten S3-Blueprint, um eine Pipeline mit einer S3-Senke zu erstellen. Diese Pipeline leitet selektive Daten an eine OpenSearch Senke weiter und sendet gleichzeitig alle Daten zur Archivierung in S3. Weitere Informationen finden Sie unter Verwenden von Blueprints zum Erstellen einer Pipeline.

Wenn Sie Ihre S3-Senke erstellen, können Sie Ihre bevorzugte Formatierung anhand einer Vielzahl von Senken-Codecs angeben. Wenn Sie beispielsweise Daten im Spaltenformat schreiben möchten, wählen Sie den Parquet- oder Avro-Codec. Wenn Sie ein zeilenbasiertes Format bevorzugen, wählen Sie oder ND-. JSON JSON Um Daten in einem bestimmten Schema nach S3 zu schreiben, können Sie mithilfe des Avro-Formats auch ein Inline-Schema innerhalb von Sink-Codecs definieren.

Das folgende Beispiel definiert ein Inline-Schema in einer S3-Senke:

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }

Wenn Sie dieses Schema definieren, geben Sie eine Obermenge aller Schlüssel an, die in den verschiedenen Ereignistypen vorhanden sein könnten, die Ihre Pipeline an eine Senke übermittelt.

Wenn bei einem Ereignis beispielsweise die Möglichkeit besteht, dass ein Schlüssel fehlt, fügen Sie diesen Schlüssel mit einem null Wert in Ihr Schema ein. Nullwertdeklarationen ermöglichen es dem Schema, ungleichmäßige Daten zu verarbeiten (wobei einige Ereignisse diese Schlüssel haben und andere nicht). Wenn bei eingehenden Ereignissen diese Schlüssel vorhanden sind, werden ihre Werte in Senken geschrieben.

Diese Schemadefinition fungiert als Filter, der nur das Senden definierter Schlüssel an Senken ermöglicht und undefinierte Schlüssel aus eingehenden Ereignissen löscht.

Sie können auch include_keys und exclude_keys in Ihrer Senke verwenden, um Daten zu filtern, die an andere Senken weitergeleitet werden. Diese beiden Filter schließen sich gegenseitig aus, sodass Sie in Ihrem Schema jeweils nur einen Filter verwenden können. Darüber hinaus können Sie sie nicht in benutzerdefinierten Schemas verwenden.

Verwenden Sie den vorkonfigurierten Senkenfilter-Blueprint, um Pipelines mit solchen Filtern zu erstellen. Weitere Informationen finden Sie unter Verwenden von Blueprints zum Erstellen einer Pipeline.

Amazon S3 Cross-Konto als Quelle

Sie können bei Amazon S3 kontenübergreifenden Zugriff gewähren, sodass OpenSearch Ingestion-Pipelines auf S3-Buckets in einem anderen Konto als Quelle zugreifen können. Informationen zum Aktivieren des kontoübergreifenden Zugriffs finden Sie unter Bucket-Besitzer, der kontoübergreifende Bucket-Berechtigungen erteilt im Amazon S3 S3-Benutzerhandbuch. Nachdem Sie den Zugriff gewährt haben, stellen Sie sicher, dass Ihre Pipeline-Rolle über die erforderlichen Berechtigungen verfügt.

Anschließend können Sie eine YAML Konfiguration erstellen, die bucket_owners den kontoübergreifenden Zugriff auf einen Amazon S3 S3-Bucket als Quelle ermöglicht:

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"