Memproses Data Terlebih Dulu Menggunakan Fungsi Lambda - Panduan Pengembang Amazon Kinesis Data Analytics SQL untuk Aplikasi

Setelah mempertimbangkan dengan cermat, kami memutuskan untuk menghentikan Amazon Kinesis Data Analytics SQL untuk aplikasi dalam dua langkah:

1. Mulai 15 Oktober 2025, Anda tidak akan dapat membuat Kinesis Data Analytics SQL baru untuk aplikasi.

2. Kami akan menghapus aplikasi Anda mulai 27 Januari 2026. Anda tidak akan dapat memulai atau mengoperasikan Amazon Kinesis Data Analytics Anda SQL untuk aplikasi. Support tidak akan lagi tersedia untuk Amazon Kinesis Data Analytics SQL sejak saat itu. Untuk informasi selengkapnya, lihat Amazon Kinesis Data Analytics SQL untuk penghentian Aplikasi.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memproses Data Terlebih Dulu Menggunakan Fungsi Lambda

catatan

Setelah 12 September 2023, Anda tidak akan dapat membuat aplikasi baru menggunakan Kinesis Data Firehose sebagai sumber jika Anda belum menggunakan Kinesis Data Analytics untuk SQL. Untuk informasi selengkapnya, lihat Batasan-batasan.

Jika data dalam aliran Anda memerlukan konversi format, transformasi, pengayaan, atau pemfilteran, Anda dapat memproses data menggunakan fungsi terlebih dahulu. AWS Lambda Anda dapat melakukan ini sebelum kode SQL aplikasi Anda mengeksekusi atau sebelum aplikasi Anda membuat skema dari aliran data Anda.

Menggunakan fungsi Lambda untuk memproses catatan terlebih dulu berguna dalam skenario berikut:

  • Mengubah catatan dari format lain (seperti KPL atau GZIP) ke dalam format yang dapat dianalisis oleh Kinesis Data Analytics. Kinesis Data Analytics saat ini mendukung format data JSON atau CSV.

  • Memperluas data ke dalam format yang lebih mudah diakses untuk operasi seperti agregasi atau deteksi anomali. Misalnya, jika beberapa nilai data disimpan bersama-sama dalam string, Anda dapat memperluas data ke dalam kolom terpisah.

  • Pengayaan data dengan layanan Amazon lainnya, seperti ekstrapolasi atau koreksi kesalahan.

  • Menerapkan transformasi string yang kompleks untuk mencatat bidang.

  • Penyaringan data untuk membersihkan data.

Menggunakan Fungsi Lambda untuk Memproses Catatan Terlebih Dulu

Saat membuat aplikasi Kinesis Data Analytics, Anda mengaktifkan prapemrosesan Lambda di halaman Connect to a Source (Sambungkan ke Sumber).

Untuk menggunakan fungsi Lambda untuk memproses catatan terlebih dulu di aplikasi Kinesis Data Analytics
  1. Masuk ke AWS Management Console dan buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/kinesisanalytics.

  2. Di halaman Connect to a Source (Sambungkan ke Sumber) untuk aplikasi Anda, pilih Enabled (Aktifkan) di bagian Catan prapemrosesan dengan AWS Lambda.

  3. Untuk menggunakan fungsi Lambda yang sudah Anda buat, pilih fungsi di daftar menurun Fungsi Lambda.

  4. Untuk membuat fungsi Lambda baru dari salah satu templat prapemrosesan Lambda, pilih template dari daftar menurun. Selanjutnya pilih View <template name> in Lambda (Lihat <template name> di Lambda) untuk mengedit fungsi.

  5. Untuk membuat fungsi Lambda baru, pilih Create new (Buat baru). Untuk informasi tentang membuat fungsi Lambda, lihat Membuat Fungsi HelloWorld Lambda dan Menjelajahi Konsol di Panduan Pengembang.AWS Lambda

  6. Pilih versi fungsi Lambda yang akan digunakan. Untuk menggunakan versi terbaru, pilih $LATEST.

Saat Anda memilih atau membuat fungsi Lambda untuk prapemrosesan catatan, catatan diproses terlebih dulu sebelum kode SQL aplikasi Anda mengeksekusi atau aplikasi Anda menghasilkan skema dari catatan.

Izin Prapemrosesan Lambda

Untuk menggunakan prapemrosesan Lambda, IAM role aplikasi memerlukan kebijakan izin berikut:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Metrik Prapemrosesan Lambda

Anda dapat menggunakan Amazon CloudWatch untuk memantau jumlah pemanggilan Lambda, byte yang diproses, keberhasilan dan kegagalan, dan sebagainya. Untuk informasi tentang CloudWatch metrik yang dipancarkan oleh preprocessing Kinesis Data Analytics Lambda, lihat Metrik Amazon Kinesis Analytics.

Menggunakan AWS Lambda dengan Perpustakaan Produser Kinesis

Kinesis Producer Library (KPL) menggabungkan catatan kecil yang diformat pengguna ke dalam catatan yang lebih besar hingga 1 MB untuk memanfaatkan throughput Amazon Kinesis Data Streams dengan lebih baik. Kinesis Client Library (KCL) untuk Java mendukung pemisahkan catatan-catatan ini. Namun, Anda harus menggunakan modul khusus untuk mendeagregasi catatan saat Anda menggunakan AWS Lambda sebagai konsumen aliran Anda.

Untuk mendapatkan kode dan instruksi proyek yang diperlukan, lihat Modul Deagregasi Perpustakaan Produser Kinesis untuk selanjutnya. AWS LambdaGitHub Anda dapat menggunakan komponen dalam proyek ini untuk memproses data serial KPL AWS Lambda dalam Java, Node.js, dan Python. Anda juga dapat menggunakan komponen ini sebagai bagian dari aplikasi KCL multibahasa.

Model Data Input Peristiwa Prapemrosesan Data/Model Respons Catatan

Untuk memproses catatan terlebih dulu, fungsi Lambda Anda harus sesuai dengan data input peristiwa yang diperlukan dan model respon catatan.

Model Data Input Peristiwa

Kinesis Data Analytics terus membaca data dari aliran data Kinesis atau aliran pengiriman Firehose Anda. Untuk setiap batch catatan yang diambil, layanan mengelola bagaimana setiap batch akan diteruskan ke fungsi Lambda Anda. Fungsi Anda menerima daftar catatan sebagai input. Dalam fungsi Anda, Anda mengulangi melalui daftar dan menerapkan logika bisnis Anda untuk memenuhi persyaratan prapemrosesan Anda (seperti konversi format data atau pengayaan).

Model input ke fungsi preprocessing Anda sedikit berbeda, tergantung pada apakah data diterima dari aliran data Kinesis atau aliran pengiriman Firehose.

Jika sumbernya adalah aliran pengiriman Firehose, model data input peristiwa adalah sebagai berikut:

Model Data Permintaan Selang Api Data Kinesis

Bidang Deskripsi
invocationId Id invokasi Lambda (GUID acak).
applicationArn Amazon Resource Name (ARN) aplikasi Kinesis Data Analytics
streamArn ARN aliran pengiriman
catatan
Bidang Deskripsi
recordId ID catatan (GUID acak)
kinesisFirehoseRecordMetadata
Bidang Deskripsi
approximateArrivalTimestamp Perkiraan waktu kedatangan catatan aliran pengiriman
data Muatan catatan sumber berkode Base64

Contoh berikut menunjukkan input dari aliran pengiriman Firehose:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

Jika sumbernya adalah Kinesis data stream, model data input peristiwa adalah sebagai berikut:

Model Data Permintaan Aliran Kinesis

Bidang Deskripsi
invocationId Id invokasi Lambda (GUID acak).
applicationArn ARN aplikasi Kinesis Data Analytics
streamArn ARN aliran pengiriman
catatan
Bidang Deskripsi
recordId ID catatan berdasarkan nomor urut catatan Kinesis
kinesisStreamRecordMetadata
Bidang Deskripsi
sequenceNumber Nomor urut dari catatan aliran Kinesis
partitionKey Kunci partisi dari catatan aliran Kinesis
shardId ShardId dari catatan aliran Kinesis
approximateArrivalTimestamp Perkiraan waktu kedatangan catatan aliran pengiriman
data Muatan catatan sumber berkode Base64

Contoh berikut menunjukkan input dari Kinesis data stream:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

Model Respons Catatan

Semua catatan yang dikembalikan dari fungsi prapemrosesan Lambda Anda (dengan ID catatan) yang dikirim ke fungsi Lambda harus dikembalikan. Catatan tersebut harus berisi parameter berikut, atau Kinesis Data Analytics menolaknya dan menganggapnya sebagai kegagalan prapemrosesan data. Bagian muatan data dari catatan dapat diubah untuk mencapai persyaratan prapemrosesan.

Model Data Respons

catatan
Bidang Deskripsi
recordId ID catatan diteruskan dari Kinesis Data Analytics ke Lambda selama invokasi. Catatan yang diubah harus berisi ID catatan yang sama. Ketidakcocokan apa pun antara ID dari catatan asli dan ID dari catatan yang diubah dianggap sebagai kegagalan prapemrosesan data.
result Status transformasi data dari catatan. Nilai yang mungkin adalah:
  • Ok: Catatan berhasil diubah. Kinesis Data Analytics menyerap catatan untuk pemrosesan SQL.

  • Dropped: Catatan sengaja dijatuhkan oleh logika pemrosesan Anda. Kinesis Data Analytics menjatuhkan catatan dari pemrosesan SQL. Bidang muatan data bersifat opsional untuk catatan Dropped.

  • ProcessingFailed: Catatan tidak dapat diubah. Kinesis Data Analytics menganggapnya tidak berhasil diproses oleh fungsi Lambda Anda dan menulis kesalahan pada aliran kesalahan. Untuk informasi selengkapnya tentang aliran kesalahan, lihat Penanganan Kesalahan. Bidang muatan data bersifat opsional untuk catatan ProcessingFailed.

data Muatan data yang diubah, setelah enkode base64. Setiap muatan data dapat berisi beberapa dokumen JSON jika format data penyerapan aplikasi adalah JSON. Atau masing-masing dapat berisi beberapa baris CSV (dengan pembatas baris yang ditentukan di setiap baris) jika format data penyerapan aplikasi adalah CSV. Layanan Kinesis Data Analytics berhasil mengurai dan memproses data dengan beberapa dokumen JSON atau baris CSV dalam muatan data yang sama.

Contoh berikut menunjukkan output dari fungsi Lambda:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

Kegagalan Prapemrosesan Data Umum

Berikut adalah alasan umum prapemrosesan bisa gagal.

  • Tidak semua catatan (dengan ID catatan) dalam batch yang dikirim ke fungsi Lambda dikembalikan ke layanan Kinesis Data Analytics.

  • Respons kehilangan ID catatan, status, atau bidang muatan data. Bidang muatan data bersifat opsional untuk catatan Dropped atau ProcessingFailed.

  • Waktu habis fungsi Lambda tidak cukup untuk memproses data terlebih dulu.

  • Respons fungsi Lambda melebihi batas respons yang dikenakan oleh layanan AWS Lambda .

Untuk kegagalan prapemrosesan data, Kinesis Data Analytics terus mencoba kembali invokasi Lambda pada kumpulan catatan yang sama hingga berhasil. Anda dapat memantau CloudWatch metrik berikut untuk mendapatkan wawasan tentang kegagalan.

  • Aplikasi Kinesis Data Analytics MillisBehindLatest: Menunjukkan seberapa jauh aplikasi membaca dari sumber streaming.

  • Metrik InputPreprocessing CloudWatch aplikasi Kinesis Data Analytics: Menunjukkan jumlah keberhasilan dan kegagalan, di antara statistik lainnya. Untuk informasi selengkapnya, lihat Metrik Amazon Kinesis Analytics.

  • AWS Lambda CloudWatch metrik fungsi dan log.