Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Verwenden einer OpenSearch Ingestion-Pipeline mit Amazon S3
Mit OpenSearch Ingestion können Sie Amazon S3 als Quelle oder als Ziel verwenden. Wenn Sie Amazon S3 als Quelle verwenden, senden Sie Daten an eine OpenSearch Ingestion-Pipeline. Wenn Sie Amazon S3 als Ziel verwenden, schreiben Sie Daten aus einer OpenSearch Ingestion-Pipeline in einen oder mehrere S3-Buckets.
Amazon S3 als Quelle
Es gibt zwei Möglichkeiten, Amazon S3 als Quelle für die Datenverarbeitung zu verwenden: mit SQSS3-Verarbeitung und mit geplanten Scans.
Verwenden Sie die SQS S3-Verarbeitung, wenn Sie Dateien fast in Echtzeit scannen möchten, nachdem sie in S3 geschrieben wurden. Sie können Amazon S3 S3-Buckets so konfigurieren, dass sie jedes Mal ein Ereignis auslösen, wenn ein Objekt im Bucket gespeichert oder geändert wird. Verwenden Sie einen einmaligen oder wiederkehrenden geplanten Scan, um Daten in einem S3-Bucket stapelweise zu verarbeiten.
Themen
Voraussetzungen
Um Amazon S3 als Quelle für eine OpenSearch Ingestion-Pipeline sowohl für einen geplanten Scan als auch für die SQS S3-Verarbeitung zu verwenden, erstellen Sie zunächst einen S3-Bucket.
Anmerkung
Wenn sich der S3-Bucket, der als Quelle in der OpenSearch Ingestion-Pipeline verwendet wird, in einem anderen befindet AWS-Konto, müssen Sie auch kontoübergreifende Leseberechtigungen für den Bucket aktivieren. Dadurch kann die Pipeline die Daten lesen und verarbeiten. Informationen zum Aktivieren kontoübergreifender Berechtigungen finden Sie unter Bucket-Besitzer, der kontoübergreifende Bucket-Berechtigungen erteilt im Amazon S3 S3-Benutzerhandbuch.
Wenn sich Ihre S3-Buckets in mehreren Konten befinden, verwenden Sie eine Map. bucket_owners
Ein Beispiel finden Sie in der Dokumentation unter Kontoübergreifender S3-Zugriff
Um die SQS S3-Verarbeitung einzurichten, müssen Sie außerdem die folgenden Schritte ausführen:
-
Aktivieren Sie Ereignisbenachrichtigungen im S3-Bucket mit der SQS Warteschlange als Ziel.
Schritt 1: Konfigurieren Sie die Pipeline-Rolle
Im Gegensatz zu anderen Quell-Plugins, die Daten in eine Pipeline übertragen, verfügt das S3-Quell-Plug-In
Damit eine Pipeline aus S3 lesen kann, müssen Sie daher eine Rolle in der S3-Quellkonfiguration der Pipeline angeben, die Zugriff sowohl auf den S3-Bucket als auch auf die SQS Amazon-Warteschlange hat. Die Pipeline übernimmt diese Rolle, um Daten aus der Warteschlange zu lesen.
Anmerkung
Die Rolle, die Sie in der S3-Quellkonfiguration angeben, muss die Pipeline-Rolle sein. Daher muss Ihre Pipeline-Rolle zwei separate Berechtigungsrichtlinien enthalten — eine zum Schreiben in eine Senke und eine zum Abrufen aus der S3-Quelle. Sie müssen dasselbe sts_role_arn
in allen Pipeline-Komponenten verwenden.
Die folgende Beispielrichtlinie zeigt die erforderlichen Berechtigungen für die Verwendung von S3 als Quelle:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::
my-bucket
/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2
:{account-id}
:MyS3EventSqsQueue
" } ] }
Sie müssen diese Berechtigungen an die IAM Rolle anhängen, die Sie in der sts_role_arn
Option in der Konfiguration des S3-Quell-Plug-ins angeben:
version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::
{account-id}
:role/pipeline-role
processor: ... sink: - opensearch: ...
Schritt 2: Erstellen Sie die Pipeline
Nachdem Sie Ihre Berechtigungen eingerichtet haben, können Sie je nach Ihrem Amazon OpenSearch S3-Anwendungsfall eine Ingestion-Pipeline konfigurieren.
S3-Verarbeitung SQS
Um die SQS S3-Verarbeitung einzurichten, konfigurieren Sie Ihre Pipeline so, dass sie S3 als Quelle angibt, und richten Sie SQS Amazon-Benachrichtigungen ein:
version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.
us-east-1
.amazonaws.com/{account-id}
/ingestion-queue
" compression: "none" aws: region: "us-east-1
" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role
" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint
.us-east-1
.es.amazonaws.com"] index: "index-name
" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role
" region: "us-east-1
"
Wenn Sie bei der Verarbeitung kleiner Dateien auf Amazon S3 eine geringe CPU Auslastung feststellen, sollten Sie erwägen, den Durchsatz zu erhöhen, indem Sie den Wert der workers
Option ändern. Weitere Informationen finden Sie in den Konfigurationsoptionen des S3-Plug-ins
Geplanter Scan
Um einen geplanten Scan einzurichten, konfigurieren Sie Ihre Pipeline mit einem Zeitplan auf Scanebene, der für alle Ihre S3-Buckets gilt, oder auf Bucket-Ebene. Ein Zeitplan auf Bucket-Ebene oder eine Konfiguration mit Scan-Intervallen überschreibt immer eine Konfiguration auf Scan-Ebene.
Sie können geplante Scans entweder mit einem einmaligen Scan konfigurieren, der sich ideal für die Datenmigration eignet, oder mit einem wiederkehrenden Scan, der sich ideal für die Stapelverarbeitung eignet.
Verwenden Sie die vorkonfigurierten Amazon S3-Blueprints, um Ihre Pipeline für das Lesen aus Amazon S3 zu konfigurieren. Sie können den scan
Teil Ihrer Pipeline-Konfiguration bearbeiten, um Ihre Planungsanforderungen zu erfüllen. Weitere Informationen finden Sie unter Verwenden von Blueprints zum Erstellen einer Pipeline.
Einmaliger Scan
Ein einmaliger geplanter Scan wird einmal ausgeführt. In Ihrer YAML Konfiguration können Sie mit einem start_time
und angebenend_time
, wann die Objekte im Bucket gescannt werden sollen. Alternativ können range
Sie das Zeitintervall im Verhältnis zur aktuellen Uhrzeit angeben, in dem die Objekte im Bucket gescannt werden sollen.
Zum Beispiel ein Bereich, der so eingestellt ist, dass alle Dateien PT4H
gescannt werden, die in den letzten vier Stunden erstellt wurden. Um einen einmaligen Scan so zu konfigurieren, dass er ein zweites Mal ausgeführt wird, müssen Sie die Pipeline beenden und neu starten. Wenn Sie keinen Bereich konfiguriert haben, müssen Sie auch die Start- und Endzeiten aktualisieren.
Die folgende Konfiguration richtet einen einmaligen Scan für alle Buckets und alle Objekte in diesen Buckets ein:
version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "
us-east-1
" sts_role_arn: "arn:aws:iam::{account-id
}:role/pipeline-role
" acknowledgments: true scan: buckets: - bucket: name:my-bucket-1
filter: include_prefix: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: name:my-bucket-2
key_prefix: include: -Objects2
/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint
.us-east-1
.es.amazonaws.com"] index: "index-name
" aws: sts_role_arn: "arn:aws:iam::{account-id
}:role/pipeline-role
" region: "us-east-1
" dlq: s3: bucket: "my-bucket-1
" region: "us-east-1
" sts_role_arn: "arn:aws:iam::{account-id
}:role/pipeline-role
"
Die folgende Konfiguration richtet einen einmaligen Scan für alle Buckets während eines bestimmten Zeitfensters ein. Das bedeutet, dass S3 nur die Objekte verarbeitet, deren Erstellungszeiten in dieses Fenster fallen.
scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name:
my-bucket-1
filter: include: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: name:my-bucket-2
filter: include: -Objects2
/ exclude_suffix: - .jpeg - .png
Die folgende Konfiguration richtet einen einmaligen Scan sowohl auf Scan- als auch auf Bucket-Ebene ein. Start- und Endzeiten auf Bucket-Ebene haben Vorrang vor Start- und Endzeiten auf Scan-Ebene.
scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name:
my-bucket-1
filter: include: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name:my-bucket-2
filter: include: -Objects2
/ exclude_suffix: - .jpeg - .png
Beim Stoppen einer Pipeline werden alle bereits vorhandenen Verweise darauf entfernt, welche Objekte vor dem Stopp von der Pipeline gescannt wurden. Wenn eine einzelne Scan-Pipeline gestoppt wird, werden alle Objekte nach dem Start erneut gescannt, auch wenn sie bereits gescannt wurden. Wenn Sie eine einzelne Scan-Pipeline beenden müssen, empfiehlt es sich, Ihr Zeitfenster zu ändern, bevor Sie die Pipeline erneut starten.
Wenn Sie Objekte nach Start- und Endzeit filtern müssen, ist das Stoppen und Starten der Pipeline die einzige Option. Wenn Sie nicht nach Start- und Endzeit filtern müssen, können Sie Objekte nach Namen filtern. Um nach Namen zu filtern, müssen Sie Ihre Pipeline nicht beenden und starten. Verwenden include_prefix
Sie dazu und. exclude_suffix
Wiederkehrender Scan
Bei einem wiederkehrenden geplanten Scan werden Ihre angegebenen S3-Buckets in regelmäßigen, geplanten Intervallen gescannt. Sie können diese Intervalle nur auf Scanebene konfigurieren, da einzelne Konfigurationen auf Bucket-Ebene nicht unterstützt werden.
In Ihrer YAML Konfiguration interval
gibt der die Häufigkeit des wiederkehrenden Scans an und kann zwischen 30 Sekunden und 365 Tagen liegen. Der erste dieser Scans erfolgt immer, wenn Sie die Pipeline erstellen. Das count
definiert die Gesamtzahl der Scan-Instanzen.
Die folgende Konfiguration richtet einen wiederkehrenden Scan mit einer Verzögerung von 12 Stunden zwischen den Scans ein:
scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name:
my-bucket-1
filter: include: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: name:my-bucket-2
filter: include: -Objects2
/ exclude_suffix: - .jpeg - .png
Amazon S3 als Ziel
Um Daten aus einer OpenSearch Ingestion-Pipeline in einen S3-Bucket zu schreiben, verwenden Sie den vorkonfigurierten S3-Blueprint, um eine Pipeline mit einer S3-Senke zu erstellen.
Wenn Sie Ihre S3-Senke erstellen, können Sie Ihre bevorzugte Formatierung anhand einer Vielzahl von Senken-Codecs
Das folgende Beispiel definiert ein Inline-Schema in einer S3-Senke:
- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }
Wenn Sie dieses Schema definieren, geben Sie eine Obermenge aller Schlüssel an, die in den verschiedenen Ereignistypen vorhanden sein könnten, die Ihre Pipeline an eine Senke übermittelt.
Wenn bei einem Ereignis beispielsweise die Möglichkeit besteht, dass ein Schlüssel fehlt, fügen Sie diesen Schlüssel mit einem null
Wert in Ihr Schema ein. Nullwertdeklarationen ermöglichen es dem Schema, ungleichmäßige Daten zu verarbeiten (wobei einige Ereignisse diese Schlüssel haben und andere nicht). Wenn bei eingehenden Ereignissen diese Schlüssel vorhanden sind, werden ihre Werte in Senken geschrieben.
Diese Schemadefinition fungiert als Filter, der nur das Senden definierter Schlüssel an Senken ermöglicht und undefinierte Schlüssel aus eingehenden Ereignissen löscht.
Sie können auch include_keys
und exclude_keys
in Ihrer Senke verwenden, um Daten zu filtern, die an andere Senken weitergeleitet werden. Diese beiden Filter schließen sich gegenseitig aus, sodass Sie in Ihrem Schema jeweils nur einen Filter verwenden können. Darüber hinaus können Sie sie nicht in benutzerdefinierten Schemas verwenden.
Verwenden Sie den vorkonfigurierten Senkenfilter-Blueprint, um Pipelines mit solchen Filtern zu erstellen. Weitere Informationen finden Sie unter Verwenden von Blueprints zum Erstellen einer Pipeline.
Amazon S3 Cross-Konto als Quelle
Sie können bei Amazon S3 kontenübergreifenden Zugriff gewähren, sodass OpenSearch Ingestion-Pipelines auf S3-Buckets in einem anderen Konto als Quelle zugreifen können. Informationen zum Aktivieren des kontoübergreifenden Zugriffs finden Sie unter Bucket-Besitzer, der kontoübergreifende Bucket-Berechtigungen erteilt im Amazon S3 S3-Benutzerhandbuch. Nachdem Sie den Zugriff gewährt haben, stellen Sie sicher, dass Ihre Pipeline-Rolle über die erforderlichen Berechtigungen verfügt.
Anschließend können Sie eine YAML Konfiguration erstellen, die bucket_owners
den kontoübergreifenden Zugriff auf einen Amazon S3 S3-Bucket als Quelle ermöglicht:
s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"