Menggunakan pemfilteran acara dengan sumber acara Apache Kafka yang dikelola sendiri - AWS Lambda

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan pemfilteran acara dengan sumber acara Apache Kafka yang dikelola sendiri

Anda dapat menggunakan pemfilteran peristiwa untuk mengontrol rekaman mana dari aliran atau antrian yang dikirim Lambda ke fungsi Anda. Untuk informasi umum tentang cara kerja penyaringan acara, lihatKontrol peristiwa mana yang dikirim Lambda ke fungsi Anda.

Bagian ini berfokus pada penyaringan acara untuk sumber acara Apache Kafka yang dikelola sendiri.

Dasar-dasar penyaringan acara Apache Kafka yang dikelola sendiri

Misalkan produser menulis pesan ke topik di cluster Apache Kafka yang dikelola sendiri, baik dalam JSON format yang valid atau sebagai string biasa. Contoh catatan akan terlihat seperti berikut, dengan pesan dikonversi ke string yang dikodekan Base64 di bidang. value

{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }

Misalkan produser Apache Kafka Anda menulis pesan ke topik Anda dalam format berikutJSON.

{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }

Anda dapat menggunakan value kunci untuk memfilter catatan. Misalkan Anda ingin memfilter hanya catatan-catatan di mana device_ID dimulai dengan huruf AB. FilterCriteriaObjeknya adalah sebagai berikut.

{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }

Untuk kejelasan tambahan, berikut adalah nilai filter yang Pattern diperluas secara polosJSON.

{ "value": { "device_ID": [ { "prefix": "AB" } ] } }

Anda dapat menambahkan filter menggunakan konsol, AWS CLI atau AWS SAM templat.

Console

Untuk menambahkan filter ini menggunakan konsol, ikuti instruksi Melampirkan kriteria filter ke pemetaan sumber peristiwa (konsol) dan masukkan string berikut untuk kriteria Filter.

{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }
AWS CLI

Untuk membuat pemetaan sumber peristiwa baru dengan kriteria filter ini menggunakan AWS Command Line Interface (AWS CLI), jalankan perintah berikut.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'

Untuk menambahkan kriteria filter ini ke pemetaan sumber peristiwa yang ada, jalankan perintah berikut.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'
AWS SAM

Untuk menambahkan filter ini menggunakan AWS SAM, tambahkan cuplikan berikut ke YAML template untuk sumber acara Anda.

FilterCriteria: Filters: - Pattern: '{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }'

Dengan Apache Kafka yang dikelola sendiri, Anda juga dapat memfilter catatan di mana pesannya adalah string biasa. Misalkan Anda ingin mengabaikan pesan-pesan di mana string adalah “kesalahan”. FilterCriteriaObjek akan terlihat sebagai berikut.

{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }

Untuk kejelasan tambahan, berikut adalah nilai filter yang Pattern diperluas secara polosJSON.

{ "value": [ { "anything-but": [ "error" ] } ] }

Anda dapat menambahkan filter menggunakan konsol, AWS CLI atau AWS SAM templat.

Console

Untuk menambahkan filter ini menggunakan konsol, ikuti instruksi Melampirkan kriteria filter ke pemetaan sumber peristiwa (konsol) dan masukkan string berikut untuk kriteria Filter.

{ "value" : [ { "anything-but": [ "error" ] } ] }
AWS CLI

Untuk membuat pemetaan sumber peristiwa baru dengan kriteria filter ini menggunakan AWS Command Line Interface (AWS CLI), jalankan perintah berikut.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'

Untuk menambahkan kriteria filter ini ke pemetaan sumber peristiwa yang ada, jalankan perintah berikut.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
AWS SAM

Untuk menambahkan filter ini menggunakan AWS SAM, tambahkan cuplikan berikut ke YAML template untuk sumber acara Anda.

FilterCriteria: Filters: - Pattern: '{ "value" : [ { "anything-but": [ "error" ] } ] }'

Pesan Apache Kafka yang dikelola sendiri harus berupa UTF -8 string yang dikodekan, baik string biasa atau dalam format. JSON Itu karena Lambda menerjemahkan array byte Kafka menjadi UTF -8 sebelum menerapkan kriteria filter. Jika pesan Anda menggunakan pengkodean lain, seperti UTF -16 atauASCII, atau jika format pesan tidak cocok dengan format, Lambda FilterCriteria hanya memproses filter metadata. Tabel berikut merangkum perilaku spesifik:

Format pesan masuk Format pola filter untuk properti pesan Tindakan yang dihasilkan

Tali polos

Tali polos

Filter Lambda berdasarkan kriteria filter Anda.

Tali polos

Tidak ada pola filter untuk properti data

Filter Lambda (hanya pada properti metadata lainnya) berdasarkan kriteria filter Anda.

Tali polos

Valid JSON

Filter Lambda (hanya pada properti metadata lainnya) berdasarkan kriteria filter Anda.

Valid JSON

Tali polos

Filter Lambda (hanya pada properti metadata lainnya) berdasarkan kriteria filter Anda.

Valid JSON

Tidak ada pola filter untuk properti data

Filter Lambda (hanya pada properti metadata lainnya) berdasarkan kriteria filter Anda.

Valid JSON

Valid JSON

Filter Lambda berdasarkan kriteria filter Anda.

String yang tidak dikodekan UTF -8

JSON, string polos, atau tidak ada pola

Filter Lambda (hanya pada properti metadata lainnya) berdasarkan kriteria filter Anda.