Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Kembangkan konsumen dengan KCL di Jawa
Prasyarat
Sebelum Anda mulai menggunakan KCL 3.x, pastikan Anda memiliki yang berikut:
-
Java Development Kit (JDK) 8 atau lebih baru
-
AWS SDK for Java 2.x
-
Maven atau Gradle untuk manajemen ketergantungan
KCL mengumpulkan metrik pemanfaatan CPU seperti pemanfaatan CPU dari host komputasi yang dijalankan pekerja untuk menyeimbangkan beban untuk mencapai tingkat pemanfaatan sumber daya yang merata di seluruh pekerja. Untuk mengaktifkan KCL mengumpulkan metrik pemanfaatan CPU dari pekerja, Anda harus memenuhi prasyarat berikut:
Amazon Elastic Compute Cloud(Amazon EC2)
-
Sistem operasi Anda harus OS Linux.
-
Anda harus mengaktifkan IMDSv2dalam EC2 contoh Anda.
Amazon Elastic Container Service (Amazon ECS) di Amazon EC2
-
Sistem operasi Anda harus OS Linux.
-
Anda harus mengaktifkan titik akhir metadata tugas ECS versi 4.
-
Versi agen penampung Amazon ECS Anda harus 1.39.0 atau yang lebih baru.
Amazon ECS aktif AWS Fargate
-
Anda harus mengaktifkan titik akhir metadata tugas Fargate versi 4. Jika Anda menggunakan platform Fargate versi 1.4.0 atau yang lebih baru, ini diaktifkan secara default.
-
Platform Fargate versi 1.4.0 atau yang lebih baru.
Layanan Amazon Elastic Kubernetes (Amazon EKS) di Amazon EC2
-
Sistem operasi Anda harus OS Linux.
Amazon EKS di AWS Fargate
-
Platform Fargate 1.3.0 atau yang lebih baru.
penting
Jika KCL tidak dapat mengumpulkan metrik pemanfaatan CPU dari pekerja, KCL akan kembali menggunakan throughput per pekerja untuk menetapkan sewa dan menyeimbangkan beban di seluruh pekerja di armada. Untuk informasi selengkapnya, lihat Bagaimana KCL memberikan sewa kepada pekerja dan menyeimbangkan beban.
Instal dan tambahkan dependensi
Jika Anda menggunakan Maven, tambahkan dependensi berikut ke file Anda. pom.xml
Pastikan Anda mengganti 3.xx ke versi KCL terbaru.
<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>
Jika Anda menggunakan Gradle, tambahkan berikut ini ke build.gradle
file Anda. Pastikan Anda mengganti 3.xx ke versi KCL terbaru.
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
Anda dapat memeriksa versi terbaru KCL di Repositori Pusat Maven
Implementasikan konsumen
Aplikasi konsumen KCL terdiri dari komponen-komponen kunci berikut:
RecordProcessor
RecordProcessor adalah komponen inti di mana logika bisnis Anda untuk memproses catatan aliran data Kinesis berada. Ini mendefinisikan bagaimana aplikasi Anda memproses data yang diterimanya dari aliran Kinesis.
Tanggung jawab utama:
-
Inisialisasi pemrosesan untuk pecahan
-
Memproses kumpulan catatan dari aliran Kinesis
-
Pemrosesan shutdown untuk pecahan (misalnya, ketika pecahan pecah atau bergabung, atau sewa diserahkan ke host lain)
-
Tangani pos pemeriksaan untuk melacak kemajuan
Berikut ini menunjukkan contoh implementasi:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }
Berikut ini adalah penjelasan rinci dari setiap metode yang digunakan dalam contoh:
inisialisasi (InitializationInputinitializationInput)
-
Tujuan: Siapkan sumber daya atau status yang diperlukan untuk memproses catatan.
-
Ketika disebut: Sekali, ketika KCL menetapkan pecahan ke prosesor rekaman ini.
-
Poin kunci:
-
initializationInput.shardId()
: ID pecahan yang akan ditangani prosesor ini. -
initializationInput.extendedSequenceNumber()
: Nomor urut untuk mulai memproses dari.
-
ProcessRecords () ProcessRecordsInput processRecordsInput
-
Tujuan: Memproses catatan yang masuk dan kemajuan pos pemeriksaan opsional.
-
Ketika disebut: Berulang kali, selama prosesor rekaman memegang sewa untuk pecahan.
-
Poin kunci:
-
processRecordsInput.records()
: Daftar catatan untuk diproses. -
processRecordsInput.checkpointer()
: Digunakan untuk memeriksa kemajuan. -
Pastikan Anda menangani pengecualian apa pun selama pemrosesan untuk mencegah KCL gagal.
-
Metode ini harus idempoten, karena catatan yang sama dapat diproses lebih dari sekali dalam beberapa skenario, seperti data yang belum diperiksa sebelum pekerja yang tidak terduga mogok atau restart.
-
Selalu siram data buffer sebelum checkpointing untuk memastikan konsistensi data.
-
LeaseLost () LeaseLostInput leaseLostInput
-
Tujuan: Bersihkan sumber daya khusus untuk memproses pecahan ini.
-
Ketika itu disebut: Ketika Scheduler lain mengambil alih sewa untuk pecahan ini.
-
Poin kunci:
-
Checkpointing tidak diperbolehkan dalam metode ini.
-
shardEnded () ShardEndedInput shardEndedInput
-
Tujuan: Selesaikan pemrosesan untuk pecahan dan pos pemeriksaan ini.
-
Saat disebut: Ketika pecahan terbelah atau bergabung, menunjukkan semua data untuk pecahan ini telah diproses.
-
Poin kunci:
-
shardEndedInput.checkpointer()
: Digunakan untuk melakukan pemeriksaan akhir. -
Checkpointing dalam metode ini adalah wajib untuk menyelesaikan pemrosesan.
-
Gagal menyiram data dan pos pemeriksaan di sini dapat mengakibatkan hilangnya data atau pemrosesan duplikat saat pecahan dibuka kembali.
-
ShutdownRequested () ShutdownRequestedInput shutdownRequestedInput
-
Tujuan: Pos pemeriksaan dan bersihkan sumber daya saat KCL dimatikan.
-
Ketika dipanggil: Ketika KCL dimatikan, misalnya, ketika aplikasi dihentikan).
-
Poin kunci:
-
shutdownRequestedInput.checkpointer()
: Digunakan untuk melakukan checkpointing sebelum shutdown. -
Pastikan Anda menerapkan checkpointing dalam metode sehingga kemajuan disimpan sebelum aplikasi berhenti.
-
Kegagalan untuk menyiram data dan pos pemeriksaan di sini dapat mengakibatkan hilangnya data atau pemrosesan ulang catatan saat aplikasi dimulai ulang.
-
penting
KCL 3.x memastikan lebih sedikit pemrosesan ulang data saat sewa diserahkan dari satu pekerja ke pekerja lain dengan pos pemeriksaan sebelum pekerja sebelumnya dimatikan. Jika Anda tidak menerapkan logika checkpointing dalam shutdownRequested()
metode, Anda tidak akan melihat manfaat ini. Pastikan Anda telah menerapkan logika checkpointing di dalam metode. shutdownRequested()
RecordProcessorFactory
RecordProcessorFactory bertanggung jawab untuk membuat RecordProcessor instance baru. KCL menggunakan pabrik ini untuk membuat yang baru RecordProcessor untuk setiap pecahan yang perlu diproses aplikasi.
Tanggung jawab utama:
-
Buat RecordProcessor instance baru sesuai permintaan
-
Pastikan masing-masing RecordProcessor diinisialisasi dengan benar
Berikut ini adalah contoh implementasi:
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }
Dalam contoh ini, pabrik membuat yang baru SampleRecordProcessor setiap kali shardRecordProcessor () dipanggil. Anda dapat memperluas ini untuk memasukkan logika inisialisasi yang diperlukan.
Penjadwal
Scheduler adalah komponen tingkat tinggi yang mengoordinasikan semua aktivitas aplikasi KCL. Ini bertanggung jawab atas orkestrasi keseluruhan pemrosesan data.
Tanggung jawab utama:
-
Mengelola siklus hidup RecordProcessors
-
Menangani manajemen sewa untuk pecahan
-
Mengkoordinasikan pos pemeriksaan
-
Menyeimbangkan beban pemrosesan pecahan di beberapa pekerja aplikasi Anda
-
Menangani sinyal shutdown dan penghentian aplikasi yang anggun
Scheduler biasanya dibuat dan dimulai di Aplikasi Utama. Anda dapat memeriksa contoh implementasi Scheduler di bagian berikut, Aplikasi Konsumen Utama.
Aplikasi Konsumen Utama
Aplikasi Konsumen Utama mengikat semua komponen bersama-sama. Ini bertanggung jawab untuk menyiapkan konsumen KCL, menciptakan klien yang diperlukan, mengonfigurasi Scheduler, dan mengelola siklus hidup aplikasi.
Tanggung jawab utama:
-
Mengatur klien AWS layanan (Kinesis, DynamoDB,) CloudWatch
-
Konfigurasikan aplikasi KCL
-
Buat dan mulai Scheduler
-
Menangani shutdown aplikasi
Berikut ini adalah contoh implementasi:
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }
KCL menciptakan konsumen Enhanced Fan-out (EFO) dengan throughput khusus secara default. Untuk informasi selengkapnya tentang Enhanced Fan-out, lihat. Kembangkan konsumen fan-out yang disempurnakan dengan throughput khusus Jika Anda memiliki kurang dari 2 konsumen atau tidak memerlukan penundaan propagasi baca di bawah 200 ms, Anda harus mengatur konfigurasi berikut di objek scheduler untuk menggunakan konsumen throughput bersama:
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
Kode berikut adalah contoh pembuatan objek penjadwal yang menggunakan konsumen throughput bersama:
Impor:
import software.amazon.kinesis.retrieval.polling.PollingConfig;
Kode:
Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/