Pemrosesan multi-aliran dengan KCL - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Pemrosesan multi-aliran dengan KCL

Bagian ini menjelaskan perubahan KCL yang diperlukan yang memungkinkan Anda membuat aplikasi KCL konsumen yang dapat memproses lebih dari satu aliran data secara bersamaan.

penting
  • Pemrosesan multi-aliran hanya didukung di KCL 2.3 atau yang lebih baru.

  • Pemrosesan multi-aliran tidak didukung untuk KCL konsumen yang ditulis dalam bahasa non-Java yang berjalan dengan. multilangdaemon

  • Pemrosesan multi-aliran tidak didukung dalam versi KCL 1.x apa pun.

  • MultistreamTracker antarmuka

    • Untuk membangun aplikasi konsumen yang dapat memproses beberapa aliran pada saat yang sama, Anda harus menerapkan antarmuka baru yang disebut MultistreamTracker. Antarmuka ini mencakup streamConfigList metode yang mengembalikan daftar aliran data dan konfigurasinya untuk diproses oleh aplikasi KCL konsumen. Perhatikan bahwa aliran data yang sedang diproses dapat diubah selama runtime aplikasi konsumen. streamConfigListdisebut secara berkala oleh KCL untuk mempelajari tentang perubahan aliran data untuk diproses.

    • Yang streamConfigList mengisi StreamConfigdaftar.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

Atau Anda dapat menginisialisasi ConfigsBuilder dengan MultiStreamTracker jika Anda ingin menerapkan aplikasi KCL konsumen yang memproses beberapa aliran secara bersamaan.

* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Dengan dukungan multi-stream yang diterapkan untuk aplikasi KCL konsumen Anda, setiap baris tabel sewa aplikasi sekarang berisi ID pecahan dan nama aliran dari beberapa aliran data yang diproses aplikasi ini.

  • Ketika dukungan multi-stream untuk aplikasi KCL konsumen Anda diimplementasikan, akan leaseKey mengambil struktur berikut:account-id:StreamName:streamCreationTimestamp:ShardId. Misalnya, 111111111:multiStreamTest-1:12345:shardId-000000000336.

penting

Ketika aplikasi KCL konsumen Anda yang ada dikonfigurasi untuk memproses hanya satu aliran data, leaseKey (yang merupakan kunci partisi untuk tabel sewa) adalah ID pecahan. Jika Anda mengkonfigurasi ulang aplikasi KCL konsumen yang ada untuk memproses beberapa aliran data, itu merusak tabel sewa Anda, karena leaseKey strukturnya harus sebagai berikut: account-id:StreamName:StreamCreationTimestamp:ShardId untuk mendukung multi-aliran.