Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Menggunakan pipeline OpenSearch Ingestion dengan Amazon S3
Dengan OpenSearch Ingestion, Anda dapat menggunakan Amazon S3 sebagai sumber atau sebagai tujuan. Saat Anda menggunakan Amazon S3 sebagai sumber, Anda mengirim data ke pipeline OpenSearch Ingestion. Saat Anda menggunakan Amazon S3 sebagai tujuan, Anda menulis data dari pipeline OpenSearch Ingestion ke satu atau beberapa bucket S3.
Amazon S3 sebagai sumber
Ada dua cara Anda dapat menggunakan Amazon S3 sebagai sumber untuk memproses data — dengan SQSpemrosesan S3 dan dengan pemindaian terjadwal.
Gunakan SQS pemrosesan S3 saat Anda memerlukan pemindaian file yang hampir real-time setelah ditulis ke S3. Anda dapat mengonfigurasi bucket Amazon S3 untuk memunculkan peristiwa kapan saja objek disimpan atau dimodifikasi di dalam bucket. Gunakan pemindaian terjadwal satu kali atau berulang untuk mengumpulkan data proses dalam bucket S3.
Prasyarat
catatan
Jika bucket S3 yang digunakan sebagai sumber dalam pipeline OpenSearch Ingestion berbeda Akun AWS, Anda juga perlu mengaktifkan izin baca lintas akun di bucket. Hal ini memungkinkan pipeline untuk membaca dan memproses data. Untuk mengaktifkan izin lintas akun, lihat Pemilik Bucket yang memberikan izin bucket lintas akun di Panduan Pengguna Amazon S3.
Jika bucket S3 Anda ada di beberapa akun, gunakan petabucket_owners
. Sebagai contoh, lihat Akses S3 lintas akun
Untuk mengatur SQS pemrosesan S3-, Anda juga perlu melakukan langkah-langkah berikut:
-
Aktifkan pemberitahuan acara di bucket S3 dengan SQS antrian sebagai tujuan.
Langkah 1: Konfigurasikan peran pipeline
Tidak seperti plugin sumber lain yang mendorong data ke pipeline, plugin sumber S3
Oleh karena itu, agar pipeline dapat dibaca dari S3, Anda harus menentukan peran dalam konfigurasi sumber S3 pipeline yang memiliki akses ke bucket S3 dan antrian Amazon. SQS Pipeline akan mengambil peran ini untuk membaca data dari antrian.
catatan
Peran yang Anda tentukan dalam konfigurasi sumber S3 haruslah peran pipeline. Oleh karena itu, peran pipeline Anda harus berisi dua kebijakan izin terpisah—satu untuk menulis ke wastafel, dan satu lagi untuk menarik dari sumber S3. Anda harus menggunakan yang sama sts_role_arn
di semua komponen pipa.
Kebijakan contoh berikut menunjukkan izin yang diperlukan untuk menggunakan S3 sebagai sumber:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action":[ "s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject" ], "Resource": "arn:aws:s3:::
my-bucket
/*" }, { "Effect":"Allow", "Action":"s3:ListAllMyBuckets", "Resource":"arn:aws:s3:::*" }, { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility" ], "Resource": "arn:aws:sqs:us-west-2
:{account-id}
:MyS3EventSqsQueue
" } ] }
Anda harus melampirkan izin ini ke IAM peran yang Anda tentukan dalam sts_role_arn
opsi dalam konfigurasi plugin sumber S3:
version: "2" source: s3: ... aws: ... sts_role_arn: arn:aws:iam::
{account-id}
:role/pipeline-role
processor: ... sink: - opensearch: ...
Langkah 2: Buat pipa
Setelah menyiapkan izin, Anda dapat mengonfigurasi pipeline OpenSearch Ingestion tergantung pada kasus penggunaan Amazon S3.
S3- pengolahan SQS
Untuk mengatur SQS pemrosesan S3-, konfigurasikan pipeline Anda untuk menentukan S3 sebagai sumber dan siapkan notifikasi AmazonSQS:
version: "2" s3-pipeline: source: s3: notification_type: "sqs" codec: newline: null sqs: queue_url: "https://sqs.
us-east-1
.amazonaws.com/{account-id}
/ingestion-queue
" compression: "none" aws: region: "us-east-1
" # IAM role that the pipeline assumes to read data from the queue. This role must be the same as the pipeline role. sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role
" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint
.us-east-1
.es.amazonaws.com"] index: "index-name
" aws: # IAM role that the pipeline assumes to access the domain sink sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role
" region: "us-east-1
"
Jika Anda mengamati CPU pemanfaatan rendah saat memproses file kecil di Amazon S3, pertimbangkan untuk meningkatkan throughput dengan memodifikasi nilai opsi. workers
Untuk informasi selengkapnya, lihat opsi konfigurasi plugin S3
Pemindaian terjadwal
Untuk menyiapkan pemindaian terjadwal, konfigurasikan pipeline Anda dengan jadwal pada tingkat pemindaian yang berlaku untuk semua bucket S3 Anda, atau di tingkat bucket. Jadwal tingkat ember atau konfigurasi interval pemindaian selalu menimpa konfigurasi tingkat pemindaian.
Anda dapat mengonfigurasi pemindaian terjadwal dengan pemindaian satu kali, yang ideal untuk migrasi data, atau pemindaian berulang, yang ideal untuk pemrosesan batch.
Untuk mengonfigurasi pipeline agar dapat dibaca dari Amazon S3, gunakan cetak biru Amazon S3 yang telah dikonfigurasi sebelumnya. Anda dapat mengedit scan
bagian konfigurasi pipeline untuk memenuhi kebutuhan penjadwalan Anda. Untuk informasi selengkapnya, lihat Menggunakan cetak biru untuk membuat pipeline.
Pemindaian satu kali
Pemindaian terjadwal satu kali berjalan sekali. Dalam YAML konfigurasi Anda, Anda dapat menggunakan start_time
dan end_time
untuk menentukan kapan Anda ingin objek dalam ember dipindai. Atau, Anda dapat menggunakan range
untuk menentukan interval waktu relatif terhadap waktu saat ini yang Anda inginkan objek dalam ember untuk dipindai.
Misalnya, rentang yang diatur untuk PT4H
memindai semua file yang dibuat dalam empat jam terakhir. Untuk mengonfigurasi pemindaian satu kali untuk menjalankan kedua kalinya, Anda harus menghentikan dan memulai ulang pipa. Jika Anda tidak memiliki rentang yang dikonfigurasi, Anda juga harus memperbarui waktu mulai dan berakhir.
Konfigurasi berikut menyiapkan pemindaian satu kali untuk semua bucket dan semua objek di bucket tersebut:
version: "2" log-pipeline: source: s3: codec: csv: compression: "none" aws: region: "
us-east-1
" sts_role_arn: "arn:aws:iam::{account-id
}:role/pipeline-role
" acknowledgments: true scan: buckets: - bucket: name:my-bucket-1
filter: include_prefix: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: name:my-bucket-2
key_prefix: include: -Objects2
/ exclude_suffix: - .jpeg - .png delete_s3_objects_on_read: false processor: - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint
.us-east-1
.es.amazonaws.com"] index: "index-name
" aws: sts_role_arn: "arn:aws:iam::{account-id
}:role/pipeline-role
" region: "us-east-1
" dlq: s3: bucket: "my-bucket-1
" region: "us-east-1
" sts_role_arn: "arn:aws:iam::{account-id
}:role/pipeline-role
"
Konfigurasi berikut mengatur pemindaian satu kali untuk semua bucket selama jendela waktu yang ditentukan. Ini berarti bahwa S3 hanya memproses objek-objek dengan waktu pembuatan yang termasuk dalam jendela ini.
scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: name:
my-bucket-1
filter: include: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: name:my-bucket-2
filter: include: -Objects2
/ exclude_suffix: - .jpeg - .png
Konfigurasi berikut mengatur pemindaian satu kali pada tingkat pemindaian dan tingkat bucket. Waktu mulai dan berakhir pada tingkat bucket menimpa waktu mulai dan berakhir pada tingkat pemindaian.
scan: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z buckets: - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name:
my-bucket-1
filter: include: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: start_time: 2023-01-21T18:00:00.000Z end_time: 2023-04-21T18:00:00.000Z name:my-bucket-2
filter: include: -Objects2
/ exclude_suffix: - .jpeg - .png
Menghentikan pipa menghilangkan referensi yang sudah ada sebelumnya tentang objek apa yang telah dipindai oleh pipa sebelum berhenti. Jika pipa pemindaian tunggal dihentikan, itu akan memindai ulang semua objek lagi setelah dimulai, bahkan jika mereka sudah dipindai. Jika Anda perlu menghentikan pipa pemindaian tunggal, disarankan Anda mengubah jendela waktu Anda sebelum memulai pipa lagi.
Jika Anda perlu memfilter objek berdasarkan waktu mulai dan waktu akhir, menghentikan dan memulai pipeline Anda adalah satu-satunya pilihan. Jika Anda tidak perlu memfilter berdasarkan waktu mulai dan waktu akhir, Anda dapat memfilter objek berdasarkan nama. Flitering dengan nama tidak mengharuskan Anda untuk berhenti dan memulai pipeline Anda. Untuk melakukan ini, gunakan include_prefix
danexclude_suffix
.
Pemindaian berulang
Pemindaian terjadwal berulang menjalankan pemindaian bucket S3 yang Anda tentukan secara berkala dan terjadwal. Anda hanya dapat mengonfigurasi interval ini pada tingkat pemindaian karena konfigurasi tingkat bucket individual tidak didukung.
Dalam YAML konfigurasi Anda, interval
menentukan frekuensi pemindaian berulang, dan bisa antara 30 detik dan 365 hari. Yang pertama dari pemindaian ini selalu terjadi ketika Anda membuat pipa. count
Mendefinisikan jumlah total instance pemindaian.
Konfigurasi berikut mengatur pemindaian berulang, dengan penundaan 12 jam antara pemindaian:
scan: scheduling: interval: PT12H count: 4 buckets: - bucket: name:
my-bucket-1
filter: include: -Objects1
/ exclude_suffix: - .jpeg - .png - bucket: name:my-bucket-2
filter: include: -Objects2
/ exclude_suffix: - .jpeg - .png
Amazon S3 sebagai tujuan
Untuk menulis data dari pipeline OpenSearch Ingestion ke bucket S3, gunakan cetak biru S3 yang telah dikonfigurasi sebelumnya untuk membuat pipeline dengan sink S3.
Saat Anda membuat wastafel S3, Anda dapat menentukan pemformatan pilihan Anda dari berbagai codec wastafel
Contoh berikut mendefinisikan skema inline di wastafel S3:
- s3: codec: parquet: schema: > { "type" : "record", "namespace" : "org.vpcFlowLog.examples", "name" : "VpcFlowLog", "fields" : [ { "name" : "version", "type" : "string"}, { "name" : "srcport", "type": "int"}, { "name" : "dstport", "type": "int"}, { "name" : "start", "type": "int"}, { "name" : "end", "type": "int"}, { "name" : "protocol", "type": "int"}, { "name" : "packets", "type": "int"}, { "name" : "bytes", "type": "int"}, { "name" : "action", "type": "string"}, { "name" : "logStatus", "type" : "string"} ] }
Saat Anda menentukan skema ini, tentukan superset dari semua kunci yang mungkin ada di berbagai jenis peristiwa yang dikirim pipeline Anda ke wastafel.
Misalnya, jika suatu peristiwa memiliki kemungkinan kunci hilang, tambahkan kunci itu dalam skema Anda dengan null
nilai. Deklarasi nilai nol memungkinkan skema untuk memproses data yang tidak seragam (di mana beberapa peristiwa memiliki kunci ini dan yang lainnya tidak). Ketika peristiwa yang masuk memang memiliki kunci-kunci ini, nilainya ditulis ke sink.
Definisi skema ini bertindak sebagai filter yang hanya memungkinkan kunci yang ditentukan dikirim ke sink, dan menjatuhkan kunci yang tidak ditentukan dari peristiwa yang masuk.
Anda juga dapat menggunakan include_keys
dan exclude_keys
di wastafel Anda untuk memfilter data yang diarahkan ke sink lain. Kedua filter ini saling eksklusif, jadi Anda hanya dapat menggunakan satu per satu dalam skema Anda. Selain itu, Anda tidak dapat menggunakannya dalam skema yang ditentukan pengguna.
Untuk membuat saluran pipa dengan filter semacam itu, gunakan cetak biru filter wastafel yang telah dikonfigurasi sebelumnya. Untuk informasi selengkapnya, lihat Menggunakan cetak biru untuk membuat pipeline.
Akun lintas Amazon S3 sebagai sumber
Anda dapat memberikan akses di seluruh akun dengan Amazon S3 sehingga pipeline OpenSearch Ingestion dapat mengakses bucket S3 di akun lain sebagai sumber. Untuk mengaktifkan akses lintas akun, lihat Pemilik Bucket yang memberikan izin bucket lintas akun di Panduan Pengguna Amazon S3. Setelah Anda memberikan akses, pastikan bahwa peran pipeline Anda memiliki izin yang diperlukan.
Kemudian, Anda dapat membuat YAML konfigurasi menggunakan bucket_owners
untuk mengaktifkan akses lintas akun ke bucket Amazon S3 sebagai sumber:
s3-pipeline: source: s3: notification_type: "sqs" codec: csv: delimiter: "," quote_character: "\"" detect_header: True sqs: queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue" bucket_owners: my-bucket-01: 123456789012 my-bucket-02: 999999999999 compression: "gzip"