Ikhtisar fitur pipeline di Amazon OpenSearch Ingestion - OpenSearch Layanan Amazon

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Ikhtisar fitur pipeline di Amazon OpenSearch Ingestion

Amazon OpenSearch Ingestion menyediakan saluran pipa, yang terdiri dari sumber, buffer, nol atau lebih prosesor, dan satu atau lebih sink. Saluran pipa konsumsi ditenagai oleh Data Prepper sebagai mesin data. Untuk ikhtisar berbagai komponen pipa, lihatKonsep utama.

Bagian berikut memberikan gambaran umum tentang beberapa fitur yang paling umum digunakan di Amazon OpenSearch Ingestion.

catatan

Ini bukan daftar lengkap fitur yang tersedia untuk saluran pipa. Untuk dokumentasi komprehensif dari semua fungsi pipeline yang tersedia, lihat dokumentasi Data Prepper. Perhatikan bahwa OpenSearch Ingestion menempatkan beberapa kendala pada plugin dan opsi yang dapat Anda gunakan. Untuk informasi selengkapnya, lihat Plugin dan opsi yang didukung untuk saluran Amazon OpenSearch Ingestion.

Buffering persisten

Buffer persisten menyimpan data Anda dalam buffer berbasis disk di beberapa Availability Zone untuk menambah daya tahan pada data Anda. Anda dapat menggunakan buffering persisten untuk menyerap data untuk semua sumber berbasis push yang didukung tanpa perlu menyiapkan buffer mandiri. Ini termasuk HTTP dan OpenTelemetry sumber untuk log, jejak, dan metrik.

Untuk mengaktifkan buffering persisten, pilih Aktifkan buffer persisten saat membuat atau memperbarui pipeline. Untuk informasi lebih lanjut, lihatMembuat saluran pipa Amazon OpenSearch Ingestion. OpenSearch Ingestion secara otomatis menentukan kapasitas buffering yang diperlukan berdasarkan Unit OpenSearch Komputasi Tertelan (Ingestion OCUs) yang Anda tentukan untuk pipeline.

Jika Anda mengaktifkan buffering persisten untuk pipeline, ukuran payload permintaan maksimum default adalah 10 MB untuk sumber HTTP, dan 4 MB untuk OpenTelemetry sumber. Untuk sumber HTTP, Anda dapat meningkatkan ukuran payload permintaan maksimum hingga 20 MB. Ukuran payload permintaan adalah ukuran dari seluruh permintaan HTTP yang biasanya berisi beberapa peristiwa. Setiap peristiwa tertentu tidak boleh melebihi 3,5 MB. Jika Anda tidak mengaktifkan buffering persisten, Anda tidak akan dapat mengubah ukuran muatan maksimum menjadi 20 MB.

Untuk saluran pipa dengan buffering persisten diaktifkan, unit pipa yang dikonfigurasi dibagi antara unit komputasi dan unit buffer. Jika pipeline menggunakan prosesor intensif CPU (grok, nilai kunci dan/atau string terpisah) maka unit yang ditentukan dibagi dalam rasio buffer 1:1 untuk menghitung jika tidak mereka dibagi dalam rasio 3:1. Ada bias terhadap penyediaan lebih banyak unit komputasi dengan masing-masing rasio ini.

Sebagai contoh:

  • Pipa dengan grok dan 2 unit maks - 1 unit komputasi dan 1 unit penyangga

  • Pipa dengan grok dan 5 unit maks - 3 unit komputasi dan 2 unit penyangga

  • Pipa tanpa prosesor dan 2 unit maks - 1 unit komputasi dan 1 unit penyangga

  • Pipa tanpa prosesor dan 4 unit maks - 1 unit komputasi dan 3 unit penyangga

  • Pipa dengan grok dan 5 unit maks - 2 unit komputasi dan 3 unit penyangga

Secara default, pipeline menggunakan file Kunci milik AWS untuk mengenkripsi data buffer. Pipeline ini tidak memerlukan izin tambahan untuk peran pipeline. Sebagai alternatif, Anda dapat menentukan kunci terkelola pelanggan dan menambahkan izin IAM berikut ke peran pipeline:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:{region}:{aws-account-id}:key/1234abcd-12ab-34cd-56ef-1234567890ab" } ] }

Untuk informasi selengkapnya, lihat Kunci terkelola pelanggan di Panduan AWS Key Management Service Pengembang.

catatan

Jika Anda menonaktifkan buffering persisten, pipeline Anda akan diperbarui untuk berjalan sepenuhnya pada buffering dalam memori.

Penyediaan buffering persisten

Amazon OpenSearch Ingestion melacak data yang ditulis ke wastafel dan secara otomatis melanjutkan penulisan dari titik pemeriksaan terakhir yang berhasil jika ada pemadaman di wastafel atau masalah lain yang mencegah data berhasil ditulis. Tidak ada layanan atau komponen tambahan yang diperlukan untuk buffer persisten selain unit OpenSearch komputasi minimum dan maksimum (OCU) yang ditetapkan untuk pipeline. Ketika buffering persisten dihidupkan, setiap OCU Ingestion sekarang mampu memberikan buffering persisten bersama dengan kemampuannya yang ada untuk menelan, mengubah, dan merutekan data. Setelah Anda mengaktifkan buffering persisten, data disimpan dalam buffer selama 72 jam. Amazon OpenSearch Ingestion secara dinamis mengalokasikan buffer dari alokasi minimum dan maksimum yang Anda tentukan untuk pipeline. OCUs

Jumlah Ingestion yang OCUs digunakan untuk buffering persisten dihitung secara dinamis berdasarkan sumber data, transformasi pada data streaming, dan sink tempat data ditulis. Karena sebagian dari Ingestion OCUs sekarang berlaku untuk buffering persisten, untuk mempertahankan throughput konsumsi yang sama untuk pipeline Anda, Anda perlu meningkatkan konsumsi minimum dan maksimum saat mengaktifkan buffering persisten OCUs . Jumlah OCUs yang Anda butuhkan dengan buffering persisten tergantung pada sumber yang Anda konsumsi data dari dan juga pada jenis pemrosesan yang Anda lakukan pada data. Tabel berikut menunjukkan jumlah OCUs yang Anda butuhkan dengan buffering persisten dengan berbagai sumber dan prosesor.

Memisahkan

Anda dapat mengonfigurasi pipeline OpenSearch Ingestion untuk membagi peristiwa masuk menjadi sub-pipeline, memungkinkan Anda melakukan berbagai jenis pemrosesan pada acara masuk yang sama.

Contoh pipeline berikut membagi peristiwa yang masuk menjadi dua sub-pipeline. Setiap sub-pipeline menggunakan prosesornya sendiri untuk memperkaya dan memanipulasi data, dan kemudian mengirimkan data ke indeks yang berbeda. OpenSearch

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

Mengikat

Anda dapat menghubungkan beberapa sub-pipeline bersama-sama untuk melakukan pemrosesan dan pengayaan data dalam potongan. Dengan kata lain, Anda dapat memperkaya acara yang masuk dengan kemampuan pemrosesan tertentu dalam satu sub-pipa, kemudian mengirimkannya ke sub-pipa lain untuk pengayaan tambahan dengan prosesor yang berbeda, dan akhirnya mengirimkannya ke wastafelnya. OpenSearch

Dalam contoh berikut, log_pipeline sub-pipeline memperkaya peristiwa log masuk dengan satu set prosesor, kemudian mengirimkan acara ke indeks bernama. OpenSearch enriched_logs Pipeline mengirimkan peristiwa yang sama ke log_advanced_pipeline sub-pipeline, yang memprosesnya dan mengirimkannya ke OpenSearch indeks yang berbeda bernamaenriched_advanced_logs.

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

Antrean surat mati

Antrian huruf mati (DLQs) adalah tujuan untuk peristiwa yang gagal ditulis oleh pipa ke wastafel. Di OpenSearch Ingestion, Anda harus menentukan bucket Amazon S3 dengan izin tulis yang sesuai untuk digunakan sebagai DLQ. Anda dapat menambahkan konfigurasi DLQ ke setiap wastafel di dalam pipa. Saat pipeline menemukan kesalahan penulisan, ia membuat objek DLQ di bucket S3 yang dikonfigurasi. Objek DLQ ada dalam file JSON sebagai array peristiwa gagal.

Pipeline menulis peristiwa ke DLQ ketika salah satu dari kondisi berikut terpenuhi:

  • max_retriesUntuk OpenSearch wastafel sudah habis. OpenSearch Tertelan membutuhkan minimal 16 untuk opsi ini.

  • Peristiwa ditolak oleh wastafel karena kondisi kesalahan.

Konfigurasi

Untuk mengonfigurasi antrian huruf mati untuk sub-pipeline, tentukan dlq opsi dalam konfigurasi wastafel: opensearch

apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"

File yang ditulis ke DLQ S3 ini akan memiliki pola penamaan berikut:

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

Untuk informasi selengkapnya, lihat Dead-Letter Queues (DLQ).

Untuk instruksi untuk mengonfigurasi sts_role_arn peran, lihatMenulis ke antrian surat mati.

Contoh

Perhatikan contoh file DLQ berikut:

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

Berikut adalah contoh data yang gagal ditulis ke wastafel, dan dikirim ke bucket DLQ S3 untuk analisis lebih lanjut:

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

Manajemen indeks

Amazon OpenSearch Ingestion memiliki banyak kemampuan manajemen indeks, termasuk yang berikut ini.

Membuat indeks

Anda dapat menentukan nama indeks di wastafel pipa dan OpenSearch Ingestion membuat indeks saat menyediakan pipeline. Jika indeks sudah ada, pipeline menggunakannya untuk mengindeks peristiwa yang masuk. Jika Anda menghentikan dan memulai ulang pipeline, atau jika Anda memperbarui konfigurasi YAML-nya, pipeline akan mencoba membuat indeks baru jika belum ada. Pipeline tidak akan pernah bisa menghapus indeks.

Contoh berikut sink membuat dua indeks saat pipeline disediakan:

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

Menghasilkan nama dan pola indeks

Anda dapat menghasilkan nama indeks dinamis dengan menggunakan variabel dari bidang peristiwa yang masuk. Dalam konfigurasi sink, gunakan format string${} untuk memberi sinyal interpolasi string, dan gunakan penunjuk JSON untuk mengekstrak bidang dari peristiwa. Pilihan untuk index_type adalah custom ataumanagement_disabled. Karena index_type default untuk OpenSearch domain dan custom management_disabled untuk koleksi OpenSearch Tanpa Server, itu dapat dibiarkan tidak disetel.

Misalnya, pipeline berikut memilih metadataType bidang dari peristiwa yang masuk untuk menghasilkan nama indeks.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

Konfigurasi berikut terus menghasilkan indeks baru setiap hari atau setiap jam.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

Nama indeks juga bisa berupa string biasa dengan pola tanggal-waktu sebagai akhiran, seperti. my-index-%{yyyy.MM.dd} Ketika sink mengirim data ke OpenSearch, itu menggantikan pola tanggal-waktu dengan waktu UTC dan membuat indeks baru untuk setiap hari, seperti. my-index-2022.01.25 Untuk informasi lebih lanjut, lihat DateTimeFormatterkelas.

Nama indeks ini juga dapat berupa string yang diformat (dengan atau tanpa akhiran pola tanggal-waktu), seperti. my-${index}-name Ketika wastafel mengirim data ke OpenSearch, itu menggantikan "${index}" bagian dengan nilai dalam acara yang sedang diproses. Jika formatnya"${index1/index2/index3}", itu menggantikan bidang index1/index2/index3 dengan nilainya dalam acara tersebut.

Menghasilkan dokumen IDs

Pipeline dapat menghasilkan ID dokumen saat mengindeks dokumen ke OpenSearch. Ini dapat menyimpulkan dokumen ini IDs dari bidang dalam peristiwa yang masuk.

Contoh ini menggunakan uuid bidang dari peristiwa yang masuk untuk menghasilkan ID dokumen.

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" "document_id": "uuid"

Dalam contoh berikut, prosesor Tambah entri menggabungkan bidang uuid dan other_field dari peristiwa yang masuk untuk menghasilkan ID dokumen.

createTindakan ini memastikan bahwa dokumen dengan identik IDs tidak ditimpa. Pipeline menjatuhkan dokumen duplikat tanpa percobaan ulang atau acara DLQ. Ini adalah harapan yang masuk akal bagi penulis pipeline yang menggunakan tindakan ini, karena tujuannya adalah untuk menghindari memperbarui dokumen yang ada.

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id: "my_doc_id"

Anda mungkin ingin menyetel ID dokumen acara ke bidang dari sub-objek. Dalam contoh berikut, plugin OpenSearch sink menggunakan sub-objek info/id untuk menghasilkan ID dokumen.

sink: - opensearch: ... document_id: info/id

Mengingat peristiwa berikut, pipeline akan menghasilkan dokumen dengan _id bidang yang disetel kejson001:

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

Menghasilkan perutean IDs

Anda dapat menggunakan routing_field opsi dalam plugin OpenSearch sink untuk mengatur nilai properti routing dokumen (_routing) ke nilai dari peristiwa yang masuk.

Routing mendukung sintaks pointer JSON, sehingga bidang bersarang juga tersedia, bukan hanya bidang tingkat atas.

sink: - opensearch: ... routing_field: metadata/id document_id: id

Mengingat peristiwa berikut, plugin menghasilkan dokumen dengan _routing bidang diatur keabcd:

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

Untuk petunjuk membuat templat indeks yang dapat digunakan pipeline selama pembuatan indeks, lihat Templat indeks.

End-to-end pengakuan

OpenSearch Penyerapan memastikan daya tahan dan keandalan data dengan melacak pengirimannya dari sumber ke bak cuci di jaringan pipa tanpa kewarganegaraan menggunakan pengakuan. end-to-end Saat ini, hanya plugin sumber S3 yang mendukung end-to-end pengakuan.

Dengan end-to-end pengakuan, plugin sumber pipeline membuat set pengakuan untuk memantau sekumpulan peristiwa. Ini menerima pengakuan positif ketika peristiwa itu berhasil dikirim ke wastafel mereka, atau pengakuan negatif ketika salah satu peristiwa tidak dapat dikirim ke wastafel mereka.

Jika terjadi kegagalan atau kerusakan komponen pipa, atau jika sumber gagal menerima pengakuan, sumber akan habis waktu dan mengambil tindakan yang diperlukan seperti mencoba kembali atau mencatat kegagalan. Jika pipeline memiliki beberapa sink atau beberapa sub-pipeline yang dikonfigurasi, pengakuan tingkat peristiwa dikirim hanya setelah acara dikirim ke semua sink di semua sub-pipeline. Jika wastafel memiliki DLQ yang dikonfigurasi, end-to-end pengakuan juga melacak peristiwa yang ditulis ke DLQ.

Untuk mengaktifkan end-to-end pengakuan, sertakan acknowledgments opsi dalam konfigurasi sumber:

s3-pipeline: source: s3: acknowledgments: true ...

Sumber tekanan balik

Pipa dapat mengalami tekanan balik saat sibuk memproses data, atau jika sink sementara turun atau lambat untuk menelan data. OpenSearch Ingestion memiliki cara yang berbeda untuk menangani tekanan balik tergantung pada plugin sumber yang digunakan pipa.

Sumber HTTP

Saluran pipa yang menggunakan plugin sumber HTTP menangani tekanan balik secara berbeda tergantung pada komponen pipa mana yang padat:

  • Buffer — Ketika buffer penuh, pipeline mulai mengembalikan status HTTP REQUEST_TIMEOUT dengan kode kesalahan 408 kembali ke titik akhir sumber. Saat buffer dibebaskan, pipeline mulai memproses peristiwa HTTP lagi.

  • Thread sumber — Ketika semua thread sumber HTTP sibuk mengeksekusi permintaan dan ukuran antrian permintaan yang belum diproses telah melebihi jumlah maksimum permintaan yang diizinkan, pipeline mulai mengembalikan status HTTP TOO_MANY_REQUESTS dengan kode kesalahan 429 kembali ke titik akhir sumber. Ketika antrian permintaan turun di bawah ukuran antrian maksimum yang diizinkan, pipeline mulai memproses permintaan lagi.

OTel sumber

Ketika buffer penuh untuk pipeline yang menggunakan OpenTelemetry sumber (OTel log, OTel metrik, dan OTel jejak), pipeline mulai mengembalikan status HTTP REQUEST_TIMEOUT dengan kode kesalahan 408 ke titik akhir sumber. Saat buffer dibebaskan, pipeline mulai memproses peristiwa lagi.

Sumber S3

Ketika buffer penuh untuk saluran pipa dengan sumber S3, saluran pipa berhenti memproses pemberitahuan SQS. Saat buffer dibebaskan, saluran pipa mulai memproses notifikasi lagi.

Jika sink mati atau tidak dapat menyerap data dan end-to-end pengakuan diaktifkan untuk sumber, pipeline berhenti memproses notifikasi SQS hingga menerima pengakuan yang berhasil dari semua sink.