Amazon S3 OpenSearch での取り込みパイプラインの使用 - Amazon OpenSearch サービス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon S3 OpenSearch での取り込みパイプラインの使用

OpenSearch 取り込みでは、Amazon S3 をソースまたは送信先として使用できます。Amazon S3 をソースとして使用する場合は、 OpenSearch 取り込みパイプラインにデータを送信します。Amazon S3 を送信先として使用する場合は、取り込みパイプラインから 1 OpenSearch つ以上の S3 バケットにデータを書き込みます。

ソースとしての Amazon S3

Amazon S3 をソースとして使用してデータを処理する方法は 2 つあります。S3SQS 処理スケジュールされたスキャンです。

S3SQS に書き込まれたファイルのほぼリアルタイムのスキャンが必要な場合は、S3 処理を使用します。バケット内でオブジェクトが保存または変更されるたびにイベントを発生させるように Amazon S3 バケットを設定できます。S3 バケット内のデータをバッチ処理するには、1 回限りのスケジュールされたスキャンまたは反復的なスケジュールされたスキャンを使用します。

前提条件

スケジュールされたスキャンまたは S3 処理の両方の取り込みパイプラインのソースとして Amazon S3SQS を使用するには、まず S3 バケット を作成します。 OpenSearch

注記

取り込みパイプラインのソースとして使用される S3 OpenSearch バケットが別の にある場合は AWS アカウント、バケットに対してクロスアカウント読み取りアクセス許可を有効にする必要もあります。これにより、パイプラインはデータを読み取って処理できるようになります。クロスアカウントアクセス許可を有効にするには、「Amazon S3 ユーザーガイド」の「バケット所有者がクロスアカウントのバケットのアクセス許可を付与する」を参照してください。

S3 バケットが複数のアカウントにある場合は、bucket_ownersマップを使用します。例については、 ドキュメントの「クロスアカウント S3 アクセス OpenSearch」を参照してください。

S3SQS 処理をセットアップするには、次のステップも実行する必要があります。

  1. Amazon SQSキュー を作成します

  2. SQS キューを送信先として S3 バケットでイベント通知を有効にします

ステップ 1: パイプラインロールを設定する

パイプラインにデータをプッシュする他のソースプラグインとは異なり、S3 ソースプラグインは、パイプラインがソースからデータを取得する読み取りベースのアーキテクチャを使用しています。

したがって、パイプラインが S3 から読み取るには、パイプラインの S3 S3S3 バケットと Amazon SQSキューの両方にアクセスできるロールを指定する必要があります。キューからデータを読み取るために、パイプラインはこのロールを引き受けます。

注記

S3 ソース設定内で指定するロールは、パイプラインロールである必要があります。したがって、パイプラインロールには 2 つの異なるアクセス許可ポリシーを含める必要があります。1 つはシンクに書き込むポリシーで、もう 1 つは S3 ソースから取得するポリシーです。すべてのパイプラインコンポーネントで同じ sts_role_arn を使用する必要があります。

次のサンプルポリシーは、S3 をソースとして使用するために必要なアクセス許可を示しています。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::my-bucket/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:{account-id}:MyS3EventSqsQueue" } ] }

これらのアクセス許可は、S3 ソースプラグイン設定内の sts_role_arnオプションで指定したIAMロールにアタッチする必要があります。

version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::{account-id}:role/pipeline-role processor: ... sink: - opensearch: ...

ステップ 2: パイプラインを作成する

アクセス許可を設定したら、Amazon OpenSearch S3 のユースケースに応じて Ingestion パイプラインを設定できます。 Amazon S3

S3 SQS処理

S3SQS 処理を設定するには、ソースとして S3 を指定し、Amazon SQS通知を設定するようにパイプラインを設定します。

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1.amazonaws.com/{account-id}/ingestion-queue" compression: "none" aws: region: "us-east-1" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1"

Amazon S3 で小さなファイルを処理するときにCPU使用率が低い場合は、 workersオプションの値を変更してスループットを上げることを検討してください。 Amazon S3 詳細については、S3 プラグインの設定オプション」を参照してください。

スケジュールされたスキャン

スケジュールされたスキャンをセットアップするには、すべての S3 バケットまたはバケットレベルに適用されるスキャンレベルで、パイプラインにスケジュールを設定します。バケットレベルのスケジュールまたはスキャン間隔の設定は、常にスキャンレベルの設定を上書きします。

スケジュールれたスキャンは、データ移行に理想的な 1 回限りのスキャン、またはバッチ処理に理想的な反復的スキャンで設定できます。

Amazon S3 から読み取るようにパイプラインを設定するには、事前設定された Amazon S3 ブループリントを使用します。パイプライン設定の scan の部分は、スケジュールのニーズに合わせて編集できます。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。

1 回限りのスキャン

1 回限りのスケジュールされたスキャンは 1 回実行されます。YAML 設定では、 start_timeと を使用してend_time、バケット内のオブジェクトをスキャンするタイミングを指定できます。range を使用して、バケット内のオブジェクトをスキャンする現在の時刻に相対的な間隔を指定できます。

例えば、範囲を PT4H に設定すると、過去 4 時間に作成されたすべてのファイルがスキャンされます。1 回限りのスキャンを 2 回目に実行するように設定するには、パイプラインを停止して再起動する必要があります。範囲を設定しない場合、開始時間と終了時間も更新する必要があります。

次の設定では、すべてのバケット、およびそれらのバケット内のすべてのオブジェクトを 1 回だけスキャンするようにセットアップされます。

version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" acknowledgments: true scan: buckets: - bucket: name: my-bucket-1 filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1" dlq: s3: bucket: "my-bucket-1" region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

次の設定では、指定した時間ウィンドウですべてのバケットを 1 回だけスキャンするようにセットアップされます。したがって、S3 は、作成時間がこのウィンドウ内に収まるオブジェクトのみを処理します。

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

次の設定では、スキャンレベルとバケットレベルの両方で 1 回限りのスキャンがセットアップされます。バケットレベルの開始時間と終了時間は、スキャンレベルの開始時間と終了時間よりも優先されます。

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

パイプラインを停止すると、停止前にパイプラインによってスキャンされたオブジェクトの既存の参照が削除されます。1 つのスキャンパイプラインが停止すると、すべてのオブジェクトが既にスキャンされていても、その開始後に再スキャンされます。1 つのスキャンパイプラインを停止する必要がある場合は、パイプラインを再開する前に時間枠を変更することをお勧めします。

開始時刻と終了時刻でオブジェクトをフィルタリングする必要がある場合は、パイプラインの停止と開始が唯一のオプションです。開始時刻と終了時刻でフィルタリングする必要がない場合は、名前でオブジェクトをフィルタリングできます。名前でフィルタリングしても、パイプラインを停止して開始する必要はありません。これを行うには、 include_prefixと を使用しますexclude_suffix

反復的スキャン

反復的なスケジュールされたスキャンでは、指定した S3 バケットのスキャンがスケジュールされた間隔で定期的に実行されます。個別のバケットレベルの設定はサポートされていないため、これらの間隔はスキャンレベルでのみ設定できます。

YAML 設定では、 は繰り返しスキャンの頻度intervalを指定し、30 秒から 365 日の間を指定できます。これらのスキャンのうちの最初のスキャンは、パイプラインの作成時に実行されます。count はスキャンインスタンスの合計数を定義します。

次の設定では、12 時間のスキャン間隔での反復的スキャンがセットアップされます。

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

送信先としての Amazon S3

取り込みパイプラインから S3 OpenSearch バケットにデータを書き込むには、事前設定された S3 ブループリントを使用して S3 シンク でパイプラインを作成します。このパイプラインは、選択的データを OpenSearch シンクにルーティングし、同時にすべてのデータを S3 にアーカイブします。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。

S3 シンクを作成するとき、さまざまなシンクコーデックから優先フォーマットを指定できます。例えば、列形式でデータを書き込む場合は、Parquet コーデックまたは Avro コーデックを選択します。行ベースの形式を使用する場合は、 JSONまたは ND- を選択しますJSON。指定したスキーマで S3 にデータを書き込むには、Avro 形式を使用してシンクコーデック内にインラインスキーマを定義することもできます。

次の例では、S3 シンクのインラインスキーマが定義されます。

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }

このスキーマを定義するとき、パイプラインがシンクに配信するさまざまなタイプのイベントに存在する可能性のあるすべてのキーのスーパーセットを指定します。

例えば、あるイベントでキーが欠落している可能性がある場合は、そのキーに null 値を付けてスキーマに追加します。null 値を宣言すると、(キーを持つイベントと持たないイベントがある) 不均一なデータをスキーマで処理できます。受信イベントにこれらのキーがある場合、その値はシンクに書き込まれます。

このスキーマ定義は、定義済みのキーのみをシンクに送信し、未定義のキーを受信イベントから削除するフィルターとして機能します。

include_keysexclude_keys をシンクで使用して、他のシンクにルーティングされるデータをフィルタリングすることもできます。この 2 つのフィルターは相互に排他的であるため、スキーマでは一度に 1 つしか使用できません。また、ユーザー定義のスキーマと一緒に使用することもできません。

このようなフィルターでパイプラインを作成するには、事前設定されたシンクフィルターの設計図を使用します。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。

ソースとしての Amazon S3 クロスアカウント

Amazon S3 のアカウント間でアクセスを許可して、取り込みパイプラインが別のアカウントの OpenSearch S3 バケットにソースとしてアクセスできるようにします。クロスアカウントアクセスを有効にするには、Amazon S3 ユーザーガイド」の「バケット所有者がクロスアカウントバケットのアクセス許可を付与する」を参照してください。アクセスを許可したら、パイプラインロールに必要なアクセス許可があることを確認します。

次に、 を使用してYAML設定を作成しbucket_owners、Amazon S3 バケットへのクロスアカウントアクセスをソースとして有効にできます。

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"