Mengembangkan konsumen kustom dengan throughput bersama menggunakan AWS SDK for Java - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mengembangkan konsumen kustom dengan throughput bersama menggunakan AWS SDK for Java

Salah satu metode untuk mengembangkan Kinesis Data Streams kustom yang dibagikan oleh konsumen adalah dengan menggunakan Amazon Kinesis Data Streams. APIs Bagian ini menjelaskan penggunaan Kinesis APIs Data Streams AWS SDK dengan for Java. Contoh kode Java di bagian ini menunjukkan bagaimana melakukan KDS API operasi dasar, dan dibagi secara logis berdasarkan jenis operasi.

Contoh-contoh ini tidak mewakili kode siap produksi. Mereka tidak memeriksa semua kemungkinan pengecualian atau memperhitungkan semua kemungkinan pertimbangan keamanan atau kinerja.

Anda dapat memanggil Kinesis APIs Data Streams menggunakan bahasa pemrograman lain yang berbeda. Untuk informasi selengkapnya tentang semua yang tersedia AWS SDKs, lihat Mulai Mengembangkan dengan Amazon Web Services.

penting

Metode yang direkomendasikan untuk mengembangkan Kinesis Data Streams kustom yang dibagikan oleh konsumen adalah dengan menggunakan Kinesis Client Library (). KCL KCLmembantu Anda mengkonsumsi dan memproses data dari aliran data Kinesis dengan mengurus banyak tugas kompleks yang terkait dengan komputasi terdistribusi. Untuk informasi selengkapnya, lihat Mengembangkan Konsumen Kustom dengan Menggunakan KCL Throughput Bersama.

Dapatkan data dari aliran

Kinesis APIs Data Streams getShardIterator menyertakan dan metode getRecords yang dapat Anda panggil untuk mengambil catatan dari aliran data. Ini adalah model tarik, di mana kode Anda menarik catatan data langsung dari pecahan aliran data.

penting

Kami menyarankan Anda menggunakan dukungan prosesor rekaman yang disediakan oleh KCL untuk mengambil catatan dari aliran data Anda. Ini adalah model push, di mana Anda menerapkan kode yang memproses data. KCLMengambil catatan data dari aliran data dan mengirimkannya ke kode aplikasi Anda. Selain itu, KCL menyediakan fungsionalitas failover, recovery, dan load balancing. Untuk informasi selengkapnya, lihat Mengembangkan Konsumen Kustom dengan Menggunakan KCL Throughput Bersama.

Namun, dalam beberapa kasus Anda mungkin lebih suka menggunakan Kinesis APIs Data Streams. Misalnya, untuk menerapkan alat khusus untuk memantau atau men-debug aliran data Anda.

penting

Kinesis Data Streams mendukung perubahan pada periode retensi rekaman data aliran data Anda. Untuk informasi selengkapnya, lihat Ubah periode retensi data.

Gunakan iterator shard

Anda mengambil catatan dari aliran pada basis per-shard. Untuk setiap pecahan, dan untuk setiap batch catatan yang Anda ambil dari pecahan itu, Anda harus mendapatkan iterator shard. Iterator shard digunakan dalam getRecordsRequest objek untuk menentukan pecahan dari mana catatan akan diambil. Jenis yang terkait dengan iterator shard menentukan titik dalam pecahan dari mana catatan harus diambil (lihat nanti di bagian ini untuk lebih jelasnya). Sebelum Anda dapat bekerja dengan iterator shard, Anda harus mengambil shard. Untuk informasi selengkapnya, lihat Daftar pecahan.

Dapatkan iterator shard awal menggunakan metode inigetShardIterator. Dapatkan iterator pecahan untuk kumpulan catatan tambahan menggunakan getNextShardIterator metode getRecordsResult objek yang dikembalikan oleh metode. getRecords Sebuah iterator shard berlaku selama 5 menit. Jika Anda menggunakan iterator shard saat valid, Anda mendapatkan yang baru. Setiap iterator shard tetap valid selama 5 menit, bahkan setelah digunakan.

Untuk mendapatkan iterator shard awal, buat instance GetShardIteratorRequest dan teruskan ke metode. getShardIterator Untuk mengkonfigurasi permintaan, tentukan stream dan ID shard. Untuk informasi tentang cara mendapatkan aliran di AWS akun Anda, lihatDaftar aliran. Untuk informasi tentang cara mendapatkan pecahan dalam aliran, lihatDaftar pecahan.

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

Kode sampel ini menentukan TRIM_HORIZON sebagai tipe iterator ketika mendapatkan iterator shard awal. Jenis iterator ini berarti bahwa catatan harus dikembalikan dimulai dengan catatan pertama ditambahkan ke shard—daripada dimulai dengan catatan yang paling baru ditambahkan, juga dikenal sebagai tip. Berikut ini adalah jenis iterator yang mungkin:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

Untuk informasi lebih lanjut, lihat ShardIteratorType.

Beberapa tipe iterator mengharuskan Anda menentukan nomor urut selain jenisnya; misalnya:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

Setelah Anda mendapatkan catatan menggunakangetRecords, Anda bisa mendapatkan nomor urut untuk catatan dengan memanggil getSequenceNumber metode catatan.

record.getSequenceNumber()

Selain itu, kode yang menambahkan catatan ke aliran data bisa mendapatkan nomor urut untuk catatan tambahan dengan memanggil getSequenceNumber hasilputRecord.

lastSequenceNumber = putRecordResult.getSequenceNumber();

Anda dapat menggunakan nomor urut untuk menjamin peningkatan urutan catatan secara ketat. Untuk informasi selengkapnya, lihat contoh kode diPutRecordcontoh.

Gunakan GetRecords

Setelah Anda mendapatkan iterator shard, buat instance objek. GetRecordsRequest Tentukan iterator untuk permintaan menggunakan setShardIterator metode.

Secara opsional, Anda juga dapat mengatur jumlah catatan untuk diambil menggunakan metode inisetLimit. Jumlah catatan yang dikembalikan getRecords selalu sama dengan atau kurang dari batas ini. Jika Anda tidak menentukan batas ini, getRecords mengembalikan 10 MB catatan diambil. Kode contoh di bawah ini menetapkan batas ini menjadi 25 catatan.

Jika tidak ada catatan yang dikembalikan, itu berarti tidak ada catatan data saat ini tersedia dari pecahan ini pada nomor urut yang direferensikan oleh iterator pecahan. Dalam situasi ini, aplikasi Anda harus menunggu sejumlah waktu yang sesuai untuk sumber data untuk streaming. Kemudian cobalah untuk mendapatkan data dari pecahan lagi menggunakan iterator shard yang dikembalikan oleh panggilan sebelumnya ke. getRecords

Lewati getRecordsRequest ke getRecords metode, dan tangkap nilai yang dikembalikan sebagai getRecordsResult objek. Untuk mendapatkan catatan data, panggil getRecords metode pada getRecordsResult objek.

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

Untuk mempersiapkan panggilan laingetRecords, dapatkan iterator shard berikutnya dari. getRecordsResult

shardIterator = getRecordsResult.getNextShardIterator();

Untuk hasil terbaik, tidurlah setidaknya 1 detik (1.000 milidetik) di antara panggilan getRecords untuk menghindari melebihi batas frekuensi. getRecords

try { Thread.sleep(1000); } catch (InterruptedException e) {}

Biasanya, Anda harus memanggil getRecords dalam satu lingkaran, bahkan ketika Anda mengambil satu catatan dalam skenario pengujian. Satu panggilan ke getRecords mungkin mengembalikan daftar catatan kosong, bahkan ketika pecahan berisi lebih banyak catatan di nomor urutan selanjutnya. Ketika ini terjadi, yang NextShardIterator dikembalikan bersama dengan daftar catatan kosong mereferensikan nomor urut selanjutnya dalam pecahan, dan getRecords panggilan berturut-turut akhirnya mengembalikan catatan. Sampel berikut menunjukkan penggunaan loop.

Contoh: getRecords

Contoh kode berikut mencerminkan getRecords tips di bagian ini, termasuk membuat panggilan dalam satu lingkaran.

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Jika Anda menggunakan Perpustakaan Klien Kinesis, mungkin akan melakukan beberapa panggilan sebelum mengembalikan data. Perilaku ini dirancang dan tidak menunjukkan masalah dengan KCL atau data Anda.

Beradaptasi dengan reshard

Jika getRecordsResult.getNextShardIterator kembalinull, ini menunjukkan bahwa pecahan pecahan atau penggabungan telah terjadi yang melibatkan pecahan ini. Pecahan ini sekarang dalam CLOSED keadaan dan Anda telah membaca semua catatan data yang tersedia dari pecahan ini.

Dalam skenario ini, Anda dapat menggunakan getRecordsResult.childShards untuk mempelajari tentang pecahan anak baru dari pecahan yang sedang diproses yang dibuat oleh split atau merge. Untuk informasi lebih lanjut, lihat ChildShard.

Dalam kasus split, dua pecahan baru keduanya memiliki parentShardId sama dengan ID pecahan pecahan yang Anda proses sebelumnya. Nilai adjacentParentShardId untuk kedua pecahan ini adalahnull.

Dalam kasus penggabungan, pecahan baru tunggal yang dibuat oleh penggabungan memiliki parentShardId sama dengan ID pecahan dari salah satu pecahan induk dan adjacentParentShardId sama dengan ID pecahan induk lainnya. Aplikasi Anda telah membaca semua data dari salah satu pecahan ini. Ini adalah pecahan yang getRecordsResult.getNextShardIterator dikembalikannull. Jika urutan data penting untuk aplikasi Anda, pastikan bahwa itu juga membaca semua data dari pecahan induk lainnya sebelum membaca data baru dari pecahan anak yang dibuat oleh penggabungan.

Jika Anda menggunakan beberapa prosesor untuk mengambil data dari aliran (katakanlah, satu prosesor per pecahan), dan pecahan pecahan atau penggabungan terjadi, sesuaikan jumlah prosesor ke atas atau ke bawah untuk beradaptasi dengan perubahan jumlah pecahan.

Untuk informasi lebih lanjut tentang resharding, termasuk diskusi tentang status pecahan — seperti —lihat. CLOSED Reshard aliran