Implementasikan konsumen - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Implementasikan konsumen

Aplikasi konsumen secara Tutorial: Memproses data stok real-time menggunakan KPL dan KCL 1.x terus menerus memproses aliran perdagangan saham yang Anda buatImplementasikan produsen. Ini kemudian menghasilkan saham paling populer yang dibeli dan dijual setiap menit. Aplikasi ini dibangun di atas Kinesis Client Library (KCL), yang melakukan banyak pekerjaan berat yang umum untuk aplikasi konsumen. Untuk informasi selengkapnya, lihat Mengembangkan KCL 1.x konsumen.

Lihat kode sumber dan tinjau informasi berikut.

StockTradesProcessor kelas

Kelas utama konsumen, disediakan untuk Anda, yang melakukan tugas-tugas berikut:

  • Membaca nama aplikasi, aliran, dan Wilayah, diteruskan sebagai argumen.

  • Membaca kredensi dari. ~/.aws/credentials

  • Menciptakan sebuah RecordProcessorFactory instance yang melayani instance dariRecordProcessor, diimplementasikan oleh sebuah StockTradeRecordProcessor instance.

  • Membuat KCL pekerja dengan RecordProcessorFactory instance dan konfigurasi standar termasuk nama aliran, kredensil, dan nama aplikasi.

  • Pekerja membuat utas baru untuk setiap pecahan (ditetapkan ke instance konsumen ini), yang terus menerus melakukan loop untuk membaca catatan dari Kinesis Data Streams. Kemudian memanggil RecordProcessor instance untuk memproses setiap batch catatan yang diterima.

StockTradeRecordProcessor kelas

Implementasi RecordProcessor instance, yang pada gilirannya mengimplementasikan tiga metode yang diperlukan:initialize,processRecords, danshutdown.

Seperti namanya, initialize dan shutdown digunakan oleh Perpustakaan Klien Kinesis untuk memberi tahu prosesor rekaman kapan harus siap untuk mulai menerima catatan dan kapan harus berhenti menerima catatan, masing-masing, sehingga dapat melakukan pengaturan khusus aplikasi dan tugas penghentian. Kode untuk ini disediakan untuk Anda. Pemrosesan utama terjadi dalam processRecords metode, yang pada gilirannya digunakan processRecord untuk setiap catatan. Metode terakhir ini disediakan sebagai sebagian besar kode kerangka kosong untuk Anda terapkan pada langkah berikutnya, di mana dijelaskan lebih lanjut.

Yang juga perlu diperhatikan adalah penerapan metode dukungan untukprocessRecord:reportStats, danresetStats, yang kosong dalam kode sumber asli.

processRecordsMetode ini diterapkan untuk Anda, dan melakukan langkah-langkah berikut:

  • Untuk setiap catatan yang diteruskan, panggil processRecord saja.

  • Jika setidaknya 1 menit telah berlalu sejak laporan terakhir, panggilanreportStats(), yang mencetak statistik terbaru, dan kemudian resetStats() yang menghapus statistik sehingga interval berikutnya hanya mencakup catatan baru.

  • Menetapkan waktu pelaporan berikutnya.

  • Jika setidaknya 1 menit telah berlalu sejak pos pemeriksaan terakhir, hubungi. checkpoint()

  • Menetapkan waktu checkpointing berikutnya.

Metode ini menggunakan interval 60 detik untuk tingkat pelaporan dan pos pemeriksaan. Untuk informasi selengkapnya tentang pos pemeriksaan, lihat. Informasi tambahan tentang konsumen

StockStats kelas

Kelas ini menyediakan retensi data dan pelacakan statistik untuk saham paling populer dari waktu ke waktu. Kode ini disediakan untuk Anda dan berisi metode berikut:

  • addStockTrade(StockTrade): Menyuntikkan yang diberikan StockTrade ke dalam statistik yang sedang berjalan.

  • toString(): Mengembalikan statistik dalam string diformat.

Kelas ini melacak saham paling populer dengan menjaga hitungan berjalan dari jumlah total perdagangan untuk setiap saham dan jumlah maksimum. Ini memperbarui jumlah ini setiap kali perdagangan saham tiba.

Tambahkan kode ke metode StockTradeRecordProcessor kelas, seperti yang ditunjukkan pada langkah-langkah berikut.

Untuk mengimplementasikan konsumen
  1. Terapkan processRecord metode dengan membuat instance StockTrade objek berukuran benar dan menambahkan data catatan ke dalamnya, mencatat peringatan jika ada masalah.

    StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
  2. Menerapkan reportStats metode sederhana. Jangan ragu untuk memodifikasi format output ke preferensi Anda.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. Akhirnya, implementasikan resetStats metode, yang menciptakan stockStats instance baru.

    stockStats = new StockStats();
Untuk menjalankan konsumen
  1. Jalankan produser yang Anda tulis Implementasikan produsen untuk menyuntikkan catatan perdagangan saham simulasi ke aliran Anda.

  2. Verifikasi bahwa kunci akses dan key pair rahasia yang diambil sebelumnya (saat membuat IAM pengguna) disimpan dalam file~/.aws/credentials.

  3. Jalankan StockTradesProcessor kelas dengan argumen berikut:

    StockTradesProcessor StockTradeStream us-west-2

    Perhatikan bahwa jika Anda membuat streaming di Wilayah selainus-west-2, Anda harus menentukan Wilayah tersebut di sini.

Setelah satu menit, Anda akan melihat output seperti berikut, disegarkan setiap menit setelahnya:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

Informasi tambahan tentang konsumen

Jika Anda terbiasa dengan keunggulan Perpustakaan Klien Kinesis, dibahas di dalam Mengembangkan KCL 1.x konsumen dan di tempat lain, Anda mungkin bertanya-tanya mengapa Anda harus menggunakannya di sini. Meskipun Anda hanya menggunakan satu aliran pecahan dan satu contoh konsumen untuk memprosesnya, masih lebih mudah untuk mengimplementasikan konsumen menggunakan. KCL Bandingkan langkah-langkah implementasi kode di bagian produsen dengan konsumen, dan Anda dapat melihat kemudahan komparatif dalam menerapkan konsumen. Ini sebagian besar disebabkan oleh layanan yang KCL disediakan.

Dalam aplikasi ini, Anda fokus pada penerapan kelas prosesor rekaman yang dapat memproses catatan individu. Anda tidak perlu khawatir tentang bagaimana catatan diambil dari Kinesis Data Streams; KCL mengambil catatan dan memanggil prosesor rekaman setiap kali ada catatan baru yang tersedia. Selain itu, Anda tidak perlu khawatir tentang berapa banyak pecahan dan contoh konsumen yang ada. Jika aliran ditingkatkan, Anda tidak perlu menulis ulang aplikasi Anda untuk menangani lebih dari satu pecahan atau satu instance konsumen.

Istilah checkpointing berarti mencatat titik dalam aliran hingga catatan data yang telah dikonsumsi dan diproses sejauh ini. Jika aplikasi macet, aliran dibaca dari titik itu dan bukan dari awal aliran. Subjek checkpointing dan berbagai pola desain dan praktik terbaik untuknya berada di luar cakupan pasal ini. Namun, itu adalah sesuatu yang mungkin Anda temui di lingkungan produksi.

Seperti yang Anda pelajariImplementasikan produsen, put operasi di Kinesis API Data Streams mengambil kunci partisi sebagai input. Kinesis Data Streams menggunakan kunci partisi sebagai mekanisme untuk membagi catatan di beberapa pecahan (ketika ada lebih dari satu pecahan dalam aliran). Kunci partisi yang sama selalu merutekan ke pecahan yang sama. Hal ini memungkinkan konsumen yang memproses pecahan tertentu untuk dirancang dengan asumsi bahwa catatan dengan kunci partisi yang sama hanya dikirim ke konsumen itu, dan tidak ada catatan dengan kunci partisi yang sama berakhir di konsumen lain. Oleh karena itu, pekerja konsumen dapat mengumpulkan semua catatan dengan kunci partisi yang sama tanpa khawatir bahwa itu mungkin kehilangan data yang diperlukan.

Dalam aplikasi ini, pemrosesan catatan konsumen tidak intensif, sehingga Anda dapat menggunakan satu pecahan dan melakukan pemrosesan di utas yang sama dengan KCL utas. Namun, dalam praktiknya, pertimbangkan terlebih dahulu untuk meningkatkan jumlah pecahan. Dalam beberapa kasus, Anda mungkin ingin mengalihkan pemrosesan ke utas yang berbeda, atau menggunakan kumpulan utas jika pemrosesan rekaman Anda diharapkan intensif. Dengan cara ini, KCL dapat mengambil catatan baru lebih cepat sementara utas lainnya dapat memproses catatan secara paralel. Desain multithreaded tidak sepele dan harus didekati dengan teknik canggih, jadi meningkatkan jumlah pecahan Anda biasanya merupakan cara paling efektif untuk meningkatkan.

Langkah selanjutnya

(Opsional) Memperluas konsumen