Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Mengembangkan konsumen Perpustakaan Klien Kinesis di Node.js
catatan
Kinesis Client Library (KCL) versi 1.x dan 2.x sudah usang. Kami merekomendasikan migrasi ke KCL versi 3.x, yang menawarkan peningkatan kinerja dan fitur baru. Untuk dokumentasi KCL terbaru dan panduan migrasi, lihatGunakan Perpustakaan Klien Kinesis.
Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Node.js.
KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. MultiLangDaemon Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Node.js dan menulis aplikasi konsumen Anda sepenuhnya di Node.js, Anda masih memerlukan Java diinstal pada sistem Anda karena. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman MultiLangDaemon proyek KCL
Untuk mengunduh Node.js KCL dari GitHub, buka Perpustakaan Klien Kinesis (
Unduhan Kode Sampel
Ada dua contoh kode yang tersedia untuk KCL di Node.js:
-
Digunakan di bagian berikut untuk menggambarkan dasar-dasar membangun aplikasi konsumen KCL di Node.js.
-
Sedikit lebih maju dan menggunakan skenario dunia nyata, setelah Anda membiasakan diri dengan kode sampel dasar. Sampel ini tidak dibahas di sini tetapi memiliki file README dengan informasi lebih lanjut.
Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di Node.js:
Menerapkan prosesor rekaman
Konsumen paling sederhana yang mungkin menggunakan KCL untuk Node.js harus mengimplementasikan recordProcessor
fungsi, yang pada gilirannya berisi fungsiinitialize
,processRecords
, danshutdown
. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihatsample_kcl_app.js
).
function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
menginisialisasi
KCL memanggil initialize
fungsi ketika prosesor rekaman dimulai. Prosesor rekaman ini hanya memproses ID pecahan yang diteruskan sebagaiinitializeInput.shardId
, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data Streams memiliki setidaknya sekali semantik, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihatGunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan.
initialize: function(initializeInput, completeCallback)
processRecords
KCL memanggil fungsi ini dengan input yang berisi daftar catatan data dari pecahan yang ditentukan ke fungsi. initialize
Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).
processRecords: function(processRecordsInput, completeCallback)
Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi, yang dapat digunakan pekerja saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. record
Kamus mengekspos pasangan kunci-nilai berikut untuk mengakses data catatan, nomor urut, dan kunci partisi:
record.data
record.sequenceNumber
record.partitionKey
Perhatikan bahwa data tersebut dikodekan oleh Base64.
Dalam sampel dasar, fungsi processRecords
memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.
Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini dengan checkpointer
objek yang dilewatkan sebagaiprocessRecordsInput.checkpointer
. Prosesor rekaman Anda memanggil checkpointer.checkpoint
fungsi untuk memberi tahu KCL seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini saat Anda memulai ulang pemrosesan pecahan sehingga berlanjut dari catatan olahan terakhir yang diketahui.
Untuk operasi split atau merge, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil checkpoint
untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.
Jika Anda tidak meneruskan nomor urut ke checkpoint
fungsi, KCL mengasumsikan bahwa panggilan ke checkpoint
berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil checkpoint
hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil checkpoint
setiap panggilan keprocessRecords
. Prosesor dapat, misalnya, memanggil setiap panggilan ketiga, atau beberapa peristiwa checkpoint
di luar prosesor rekaman Anda, seperti layanan verifikasi/validasi khusus yang telah Anda terapkan.
Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untukcheckpoint
. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.
Aplikasi sampel dasar menunjukkan panggilan sesederhana mungkin ke checkpointer.checkpoint
fungsi tersebut. Anda dapat menambahkan logika checkpointing lain yang Anda butuhkan untuk konsumen Anda pada titik ini dalam fungsi.
penonaktifan
KCL memanggil shutdown
fungsi baik saat pemrosesan berakhir (shutdownInput.reason
isTERMINATE
) atau pekerja tidak lagi merespons (shutdownInput.reason
isZOMBIE
).
shutdown: function(shutdownInput, completeCallback)
Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.
KCL juga meneruskan shutdownInput.checkpointer
objek keshutdown
. Jika alasan shutdown adalahTERMINATE
, Anda harus memastikan bahwa prosesor rekaman telah selesai memproses catatan data apa pun, dan kemudian memanggil checkpoint
fungsi pada antarmuka ini.
Ubah properti konfigurasi
Sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihat sample.properties
di sampel dasar).
Nama aplikasi
KCL memerlukan aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
-
Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
-
KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL.
Siapkan kredensial
Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan AWSCredentialsProvider
properti untuk menetapkan penyedia kredensial. sample.properties
File harus membuat kredensyal Anda tersedia untuk salah satu penyedia kredensyal dalam rantai penyedia kredensi default. Jika Anda menjalankan konsumen di EC2 instans Amazon, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil untuk aplikasi konsumen yang berjalan pada sebuah EC2 instance.
Contoh berikut mengkonfigurasi KCL untuk memproses aliran data Kinesis bernama kclnodejssample
menggunakan prosesor rekaman yang disediakan di: sample_kcl_app.js
# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON