搭配 OpenSearch Amazon S3 使用擷取管道 - Amazon OpenSearch Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配 OpenSearch Amazon S3 使用擷取管道

透過 OpenSearch 擷取,您可以使用 Amazon S3 作為來源或目的地。當您使用 Amazon S3 作為來源時,會將資料傳送至 OpenSearch 擷取管道。當您使用 Amazon S3 作為目的地時,您可以將資料從 OpenSearch 擷取管道寫入一或多個 S3 儲存貯體。

Amazon S3 作為來源

有兩種方式可以使用 Amazon S3 作為來源來處理資料:使用 S3 SQS處理排程掃描。

當您要求在檔案寫入 S3 後進行近乎即時的檔案掃描時,請使用 S3 SQS處理。您可以將 Amazon S3 儲存貯體設定為在儲存貯體中存放或修改物件時提出事件。使用一次性或重複排程掃描,在 S3 儲存貯體中批次處理資料。

必要條件

若要使用 Amazon S3 作為排程掃描或 S3 SQS處理 OpenSearch 之擷取管道的來源,請先建立 S3 儲存貯體 。

注意

如果在擷取管道中用作來源 OpenSearch 的 S3 儲存貯體位於不同的 中 AWS 帳戶,您也需要在儲存貯體上啟用跨帳戶讀取許可。這可讓管道讀取和處理資料。若要啟用跨帳戶許可,請參閱 Amazon S3 使用者指南 中的授予跨帳戶儲存貯體許可的儲存貯體擁有者

如果您的 S3 儲存貯體位於多個帳戶中,請使用bucket_owners地圖。如需範例,請參閱 文件中的 OpenSearch跨帳戶 S3 存取

若要設定 S3-SQS 處理,您也需要執行下列步驟:

  1. 建立 Amazon SQS佇列

  2. 在 S3 儲存貯體上啟用事件通知,並將SQS佇列作為目的地。

步驟 1:設定管道角色

與其他將資料推送至管道的來源外掛程式不同,S3 來源外掛程式具有讀取型架構,其中管道會從來源提取資料。

因此,若要從 S3 讀取管道,您必須在管道的 S3 來源組態中指定角色,該角色可存取 S3 儲存貯體和 Amazon SQS佇列。管道將擔任此角色,以便從佇列讀取資料。

注意

您在 S3 來源組態中指定的角色必須是管道角色 。因此,您的管道角色必須包含兩個單獨的許可政策:一個用於寫入接收端,另一個用於從 S3 來源提取。您必須在所有管道元件sts_role_arn中使用相同的 。

下列範例政策顯示使用 S3 作為來源的必要許可:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::my-bucket/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:{account-id}:MyS3EventSqsQueue" } ] }

您必須將這些許可連接到您在 S3 來源外掛程式組態中的 sts_role_arn選項中指定的IAM角色:

version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::{account-id}:role/pipeline-role processor: ... sink: - opensearch: ...

步驟 2:建立管道

設定許可後,您可以根據 Amazon S3 使用案例設定 OpenSearch 擷取管道。

S3-SQS 處理

若要設定 S3-SQS 處理,請設定管道以指定 S3 作為來源,並設定 Amazon SQS通知:

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1.amazonaws.com/{account-id}/ingestion-queue" compression: "none" aws: region: "us-east-1" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1"

如果您在 Amazon S3 上處理小型檔案時觀察到低CPU使用率,請考慮透過修改 workers 選項的值來增加輸送量。如需詳細資訊,請參閱 S3 外掛程式組態選項

排程掃描

若要設定排程掃描,請在適用於所有 S3 儲存貯體的掃描層級或儲存貯體層級,使用排程來設定管道。儲存貯體層級排程或掃描間隔組態一律會覆寫掃描層級組態。

您可以使用適合資料遷移的一次性掃描 或適合批次處理的週期性掃描 來設定排程掃描。

若要將管道設定為從 Amazon S3 讀取,請使用預先設定的 Amazon S3 藍圖。您可以編輯管道組態scan的部分,以符合排程需求。如需詳細資訊,請參閱使用藍圖建立管道

一次性掃描

一次性排程掃描會執行一次。在YAML組態中,您可以使用 start_timeend_time 來指定何時掃描儲存貯體中的物件。或者,您可以使用 range 來指定相對於目前時間的時間間隔,以便掃描儲存貯體中的物件。

例如,設定為PT4H掃描過去四小時內建立的所有檔案的範圍。若要設定一次性掃描以再次執行,您必須停止並重新啟動管道。如果您未設定範圍,也必須更新開始和結束時間。

下列組態會針對這些儲存貯體中的所有儲存貯體和所有物件設定一次性掃描:

version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" acknowledgments: true scan: buckets: - bucket: name: my-bucket-1 filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1" dlq: s3: bucket: "my-bucket-1" region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

下列組態會設定指定時段內所有儲存貯體的一次性掃描。這表示 S3 只會處理建立時間落在此時段內的物件。

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

下列組態會在掃描層級和儲存貯體層級設定一次性掃描。儲存貯體層級的開始和結束時間覆寫掃描層級的開始和結束時間。

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

停止管道會移除管道在停止之前已掃描哪些物件的任何現有參考。如果單一掃描管道停止,它將在啟動後重新掃描所有物件,即使它們已經掃描。如果您需要停止單一掃描管道,建議您在再次啟動管道之前變更時間範圍。

如果您需要依開始時間和結束時間篩選物件,則停止和啟動管道是唯一的選項。如果您不需要依開始時間和結束時間篩選,則可以依名稱篩選物件。依名稱轉譯不需要您停止和啟動管道。若要這麼做,請使用 include_prefixexclude_suffix

定期掃描

定期排程掃描會定期執行指定 S3 儲存貯體的掃描。您只能在掃描層級設定這些間隔,因為不支援個別儲存貯體層級組態。

在您的YAML組態中, 會interval指定週期性掃描的頻率,並且可以介於 30 秒到 365 天之間。當您建立管道時,這些掃描中的第一個一律會發生。count 定義掃描執行個體的總數。

下列組態會設定重複掃描,掃描之間延遲 12 小時:

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Amazon S3 作為目的地

若要將資料從 OpenSearch 擷取管道寫入 S3 儲存貯體,請使用預先設定的 S3 藍圖,以使用 S3 接收器 建立管道。此管道會將選擇性資料路由至 OpenSearch 接收器,並同時傳送所有資料以供 S3 中的封存。如需詳細資訊,請參閱使用藍圖建立管道

建立 S3 接收器時,您可以從各種接收器轉碼器 指定偏好的格式。例如,如果您想要以資料欄格式寫入資料,請選擇 Parquet 或 Avro 轉碼器。如果您偏好以資料列為基礎的格式,請選擇 JSON或 ND-JSON。若要在指定的結構描述中將資料寫入 S3,您也可以使用 Avro 格式 定義接收器轉碼器中的內嵌結構描述。

下列範例定義 S3 接收器中的內嵌結構描述:

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }

當您定義此結構描述時,請指定管道交付至接收端的不同類型事件中可能存在的所有金鑰的超級集合。

例如,如果事件可能遺失索引鍵,請在結構描述中將該索引鍵加上 null值。Null 值宣告允許結構描述處理不均勻的資料 (其中某些事件具有這些金鑰,而其他則否)。當傳入事件確實存在這些索引鍵時,其值會寫入接收器。

此結構描述定義可做為篩選條件,僅允許將定義的金鑰傳送至接收端,並從傳入事件捨棄未定義的金鑰。

您也可以在 接收器exclude_keys中使用 include_keys和 來篩選路由至其他 接收器的資料。這兩個篩選條件是互斥的,因此您一次只能在結構描述中使用一個。此外,您無法在使用者定義的結構描述中使用它們。

若要使用此類篩選條件建立管道,請使用預先設定的水槽篩選條件藍圖。如需詳細資訊,請參閱使用藍圖建立管道

Amazon S3 跨帳戶作為來源

您可以使用 Amazon S3 跨帳戶授予存取權,以便擷取管道可以存取另一個帳戶中的 S3 OpenSearch 儲存貯體作為來源。若要啟用跨帳戶存取,請參閱 Amazon S3 使用者指南 中的授予跨帳戶儲存貯體許可的儲存貯體擁有者。授予存取權之後,請確定您的管道角色具有必要的許可。

然後,您可以使用 建立YAML組態bucket_owners,以啟用 Amazon S3 儲存貯體的跨帳戶存取作為來源:

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"