透過 OpenSearch 擷取,您可以使用 Amazon S3 作為來源或目的地。當您使用 Amazon S3 作為來源時,會將資料傳送至 OpenSearch 擷取管道。當您使用 Amazon S3 作為目的地時,您可以將資料從 OpenSearch 擷取管道寫入一或多個 S3 儲存貯體。

Amazon S3 作為來源

有兩種方式可以使用 Amazon S3 作為來源來處理資料:使用 S3 SQS處理排程掃描。

當您要求在檔案寫入 S3 後進行近乎即時的檔案掃描時,請使用 S3 SQS處理。您可以將 Amazon S3 儲存貯體設定為在儲存貯體中存放或修改物件時提出事件。使用一次性或重複排程掃描,在 S3 儲存貯體中批次處理資料。


若要使用 Amazon S3 作為排程掃描或 S3 SQS處理 OpenSearch 之擷取管道的來源,請先建立 S3 儲存貯體 。


如果在擷取管道中用作來源 OpenSearch 的 S3 儲存貯體位於不同的 中 AWS 帳戶,您也需要在儲存貯體上啟用跨帳戶讀取許可。這可讓管道讀取和處理資料。若要啟用跨帳戶許可,請參閱 Amazon S3 使用者指南 中的授予跨帳戶儲存貯體許可的儲存貯體擁有者

如果您的 S3 儲存貯體位於多個帳戶中,請使用bucket_owners地圖。如需範例,請參閱 文件中的 OpenSearch跨帳戶 S3 存取

若要設定 S3-SQS 處理,您也需要執行下列步驟:

  1. 建立 Amazon SQS佇列

  2. 在 S3 儲存貯體上啟用事件通知,並將SQS佇列作為目的地。

步驟 1:設定管道角色

與其他將資料推送至管道的來源外掛程式不同,S3 來源外掛程式具有讀取型架構,其中管道會從來源提取資料。

因此,若要從 S3 讀取管道,您必須在管道的 S3 來源組態中指定角色,該角色可存取 S3 儲存貯體和 Amazon SQS佇列。管道將擔任此角色,以便從佇列讀取資料。


您在 S3 來源組態中指定的角色必須是管道角色 。因此,您的管道角色必須包含兩個單獨的許可政策:一個用於寫入接收端,另一個用於從 S3 來源提取。您必須在所有管道元件sts_role_arn中使用相同的 。

下列範例政策顯示使用 S3 作為來源的必要許可:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::my-bucket/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2:{account-id}:MyS3EventSqsQueue" } ] }

您必須將這些許可連接到您在 S3 來源外掛程式組態中的 sts_role_arn選項中指定的IAM角色:

version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::{account-id}:role/pipeline-role processor: ... sink: - opensearch: ...

步驟 2:建立管道

設定許可後,您可以根據 Amazon S3 使用案例設定 OpenSearch 擷取管道。

S3-SQS 處理

若要設定 S3-SQS 處理,請設定管道以指定 S3 作為來源,並設定 Amazon SQS通知:

version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.us-east-1.amazonaws.com/{account-id}/ingestion-queue" compression: "none" aws: region: "us-east-1" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1"

如果您在 Amazon S3 上處理小型檔案時觀察到低CPU使用率,請考慮透過修改 workers 選項的值來增加輸送量。如需詳細資訊,請參閱 S3 外掛程式組態選項


若要設定排程掃描,請在適用於所有 S3 儲存貯體的掃描層級或儲存貯體層級,使用排程來設定管道。儲存貯體層級排程或掃描間隔組態一律會覆寫掃描層級組態。

您可以使用適合資料遷移的一次性掃描 或適合批次處理的週期性掃描 來設定排程掃描。

若要將管道設定為從 Amazon S3 讀取,請使用預先設定的 Amazon S3 藍圖。您可以編輯管道組態scan的部分,以符合排程需求。如需詳細資訊,請參閱使用藍圖建立管道


一次性排程掃描會執行一次。在YAML組態中,您可以使用 start_timeend_time 來指定何時掃描儲存貯體中的物件。或者,您可以使用 range 來指定相對於目前時間的時間間隔,以便掃描儲存貯體中的物件。



version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" acknowledgments: true scan: buckets: - bucket: name: my-bucket-1 filter: include_prefix: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 key_prefix: include: - Objects2/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index-name" aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-east-1" dlq: s3: bucket: "my-bucket-1" region: "us-east-1" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

下列組態會設定指定時段內所有儲存貯體的一次性掃描。這表示 S3 只會處理建立時間落在此時段內的物件。

scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png


scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png


如果您需要依開始時間和結束時間篩選物件,則停止和啟動管道是唯一的選項。如果您不需要依開始時間和結束時間篩選,則可以依名稱篩選物件。依名稱轉譯不需要您停止和啟動管道。若要這麼做,請使用 include_prefixexclude_suffix


定期排程掃描會定期執行指定 S3 儲存貯體的掃描。您只能在掃描層級設定這些間隔,因為不支援個別儲存貯體層級組態。

在您的YAML組態中, 會interval指定週期性掃描的頻率,並且可以介於 30 秒到 365 天之間。當您建立管道時,這些掃描中的第一個一律會發生。count 定義掃描執行個體的總數。

下列組態會設定重複掃描,掃描之間延遲 12 小時:

scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name: my-bucket-1 filter: include: - Objects1/ exclude_suffix: - .jpeg - .png - bucket: name: my-bucket-2 filter: include: - Objects2/ exclude_suffix: - .jpeg - .png

Amazon S3 作為目的地

若要將資料從 OpenSearch 擷取管道寫入 S3 儲存貯體,請使用預先設定的 S3 藍圖,以使用 S3 接收器 建立管道。此管道會將選擇性資料路由至 OpenSearch 接收器,並同時傳送所有資料以供 S3 中的封存。如需詳細資訊,請參閱使用藍圖建立管道

建立 S3 接收器時,您可以從各種接收器轉碼器 指定偏好的格式。例如,如果您想要以資料欄格式寫入資料,請選擇 Parquet 或 Avro 轉碼器。如果您偏好以資料列為基礎的格式,請選擇 JSON或 ND-JSON。若要在指定的結構描述中將資料寫入 S3,您也可以使用 Avro 格式 定義接收器轉碼器中的內嵌結構描述。

下列範例定義 S3 接收器中的內嵌結構描述:

- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }


例如,如果事件可能遺失索引鍵,請在結構描述中將該索引鍵加上 null值。Null 值宣告允許結構描述處理不均勻的資料 (其中某些事件具有這些金鑰,而其他則否)。當傳入事件確實存在這些索引鍵時,其值會寫入接收器。


您也可以在 接收器exclude_keys中使用 include_keys和 來篩選路由至其他 接收器的資料。這兩個篩選條件是互斥的,因此您一次只能在結構描述中使用一個。此外,您無法在使用者定義的結構描述中使用它們。


Amazon S3 跨帳戶作為來源

您可以使用 Amazon S3 跨帳戶授予存取權,以便擷取管道可以存取另一個帳戶中的 S3 OpenSearch 儲存貯體作為來源。若要啟用跨帳戶存取,請參閱 Amazon S3 使用者指南 中的授予跨帳戶儲存貯體許可的儲存貯體擁有者。授予存取權之後,請確定您的管道角色具有必要的許可。

然後,您可以使用 建立YAML組態bucket_owners,以啟用 Amazon S3 儲存貯體的跨帳戶存取作為來源:

s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"