Menangkap batch yang dibuang untuk sumber acara Apache Kafka yang dikelola sendiri - AWS Lambda

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menangkap batch yang dibuang untuk sumber acara Apache Kafka yang dikelola sendiri

Untuk menyimpan catatan pemanggilan pemetaan sumber peristiwa yang gagal, tambahkan tujuan ke pemetaan sumber peristiwa fungsi Anda. Setiap catatan yang dikirim ke tujuan adalah dokumen JSON yang berisi metadata tentang pemanggilan yang gagal. Untuk tujuan Amazon S3, Lambda juga mengirimkan seluruh catatan pemanggilan bersama dengan metadata. Anda dapat mengonfigurasi topik Amazon SNS, antrian Amazon SQS, atau bucket S3 sebagai tujuan.

Dengan tujuan Amazon S3, Anda dapat menggunakan fitur Pemberitahuan Acara Amazon S3 untuk menerima notifikasi saat objek diunggah ke bucket S3 tujuan Anda. Anda juga dapat mengonfigurasi Pemberitahuan Acara S3 untuk menjalankan fungsi Lambda lain untuk melakukan pemrosesan otomatis pada batch yang gagal.

Peran eksekusi Anda harus memiliki izin untuk tujuan:

Anda harus menerapkan titik akhir VPC untuk layanan tujuan yang gagal di dalam VPC cluster Apache Kafka Anda.

Selain itu, jika Anda mengonfigurasi kunci KMS di tujuan Anda, Lambda memerlukan izin berikut tergantung pada jenis tujuan:

Mengonfigurasi tujuan yang gagal untuk pemetaan sumber acara Apache Kafka yang dikelola sendiri

Untuk mengonfigurasi tujuan saat gagal menggunakan konsol, ikuti langkah-langkah berikut:

  1. Buka halaman Fungsi di konsol Lambda.

  2. Pilih fungsi.

  3. Di bagian Gambaran umum fungsi, pilih Tambahkan tujuan.

  4. Untuk Sumber, pilih Pemanggilan pemetaan sumber acara.

  5. Untuk pemetaan sumber peristiwa, pilih sumber peristiwa yang dikonfigurasi untuk fungsi ini.

  6. Untuk Kondisi, pilih On failure. Untuk pemanggilan pemetaan sumber peristiwa, ini adalah satu-satunya kondisi yang diterima.

  7. Untuk tipe Tujuan, pilih tipe tujuan yang Lambda kirimkan catatan pemanggilan.

  8. Untuk Tujuan, pilih sumber daya.

  9. Pilih Simpan.

Anda juga dapat mengonfigurasi tujuan pada kegagalan menggunakan file. AWS CLI Misalnya, create-event-source-mappingperintah berikut menambahkan pemetaan sumber peristiwa dengan tujuan kegagalan SQS ke: MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

update-event-source-mappingPerintah berikut menambahkan tujuan kegagalan S3 ke sumber acara yang terkait dengan input: uuid

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Untuk menghapus tujuan, berikan string kosong sebagai argumen ke destination-config parameter:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Praktik terbaik keamanan untuk tujuan Amazon S3

Menghapus bucket S3 yang dikonfigurasi sebagai tujuan tanpa menghapus tujuan dari konfigurasi fungsi Anda dapat menimbulkan risiko keamanan. Jika pengguna lain mengetahui nama bucket tujuan Anda, mereka dapat membuat ulang bucket di bucket tersebut. Akun AWS Catatan pemanggilan yang gagal akan dikirim ke bucket mereka, yang berpotensi mengekspos data dari fungsi Anda.

Awas

Untuk memastikan bahwa catatan pemanggilan dari fungsi Anda tidak dapat dikirim ke bucket S3 di bucket lain Akun AWS, tambahkan kondisi ke peran eksekusi fungsi Anda yang membatasi s3:PutObject izin ke bucket di akun Anda.

Contoh berikut menunjukkan kebijakan IAM yang membatasi s3:PutObject izin fungsi Anda ke bucket di akun Anda. Kebijakan ini juga memberi Lambda s3:ListBucket izin yang dibutuhkan untuk menggunakan bucket S3 sebagai tujuan.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

Untuk menambahkan kebijakan izin ke peran eksekusi fungsi Anda menggunakan AWS Management Console atau AWS CLI, lihat instruksi dalam prosedur berikut:

Console
Untuk menambahkan kebijakan izin ke peran eksekusi fungsi (konsol)
  1. Buka halaman Fungsi di konsol Lambda.

  2. Pilih fungsi Lambda yang peran eksekusinya ingin Anda modifikasi.

  3. Di tab Konfigurasi, pilih Izin.

  4. Di tab Peran eksekusi, pilih nama Peran fungsi Anda untuk membuka halaman konsol IAM peran.

  5. Tambahkan kebijakan izin ke peran dengan melakukan hal berikut:

    1. Di panel Kebijakan izin, pilih Tambahkan izin, lalu pilih Buat kebijakan sebaris.

    2. Di Editor kebijakan, pilih JSON.

    3. Rekatkan kebijakan yang ingin Anda tambahkan ke editor (ganti JSON yang ada), lalu pilih Berikutnya.

    4. Di bawah Detail kebijakan, masukkan nama Kebijakan.

    5. Pilih Buat kebijakan.

AWS CLI
Untuk menambahkan kebijakan izin ke peran eksekusi fungsi (CLI)
  1. Buat dokumen kebijakan JSON dengan izin yang diperlukan dan simpan di direktori lokal.

  2. Gunakan perintah IAM put-role-policy CLI untuk menambahkan izin ke peran eksekusi fungsi Anda. Jalankan perintah berikut dari direktori tempat Anda menyimpan dokumen kebijakan JSON dan ganti nama peran, nama kebijakan, dan dokumen kebijakan dengan nilai Anda sendiri.

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Catatan pemanggilan contoh SNS dan SQS

Contoh berikut menunjukkan apa yang Lambda kirim ke topik SNS atau tujuan antrian SQS untuk pemanggilan sumber peristiwa Kafka yang gagal. Setiap kunci di bawah recordsInfo berisi topik dan partisi Kafka, dipisahkan oleh tanda hubung. Misalnya, untuk kuncinya"Topic-0", Topic adalah topik Kafka, dan 0 merupakan partisi. Untuk setiap topik dan partisi, Anda dapat menggunakan data offset dan stempel waktu untuk menemukan catatan pemanggilan asli.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Catatan pemanggilan contoh tujuan S3

Untuk tujuan S3, Lambda mengirimkan seluruh catatan pemanggilan bersama dengan metadata ke tujuan. Contoh berikut menunjukkan bahwa Lambda mengirim ke tujuan bucket S3 untuk pemanggilan sumber peristiwa Kafka yang gagal. Selain semua bidang dari contoh sebelumnya untuk tujuan SQS dan SNS, payload bidang berisi catatan pemanggilan asli sebagai string JSON yang lolos.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Tip

Sebaiknya aktifkan versi S3 di bucket tujuan Anda.