Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Pemrosesan multi-aliran dengan KCL
Bagian ini menjelaskan perubahan KCL yang diperlukan yang memungkinkan Anda membuat aplikasi KCL konsumen yang dapat memproses lebih dari satu aliran data secara bersamaan.
penting
-
Pemrosesan multi-aliran hanya didukung di KCL 2.3 atau yang lebih baru.
-
Pemrosesan multi-aliran tidak didukung untuk KCL konsumen yang ditulis dalam bahasa non-Java yang berjalan dengan.
multilangdaemon
-
Pemrosesan multi-aliran tidak didukung dalam versi KCL 1.x apa pun.
-
MultistreamTracker antarmuka
-
Untuk membangun aplikasi konsumen yang dapat memproses beberapa aliran pada saat yang sama, Anda harus menerapkan antarmuka baru yang disebut MultistreamTracker
. Antarmuka ini mencakup streamConfigList
metode yang mengembalikan daftar aliran data dan konfigurasinya untuk diproses oleh aplikasi KCL konsumen. Perhatikan bahwa aliran data yang sedang diproses dapat diubah selama runtime aplikasi konsumen.streamConfigList
disebut secara berkala oleh KCL untuk mempelajari tentang perubahan aliran data untuk diproses. -
Yang
streamConfigList
mengisi StreamConfigdaftar.
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
-
InitialPositionInStreamExtended
BidangStreamIdentifier
dan wajib, sementaraconsumerArn
bersifat opsional. Anda harus menyediakanconsumerArn
satu-satunya jika Anda menggunakan KCL untuk menerapkan aplikasi konsumen fan-out yang disempurnakan. -
Untuk informasi selengkapnya
StreamIdentifier
, lihat https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129. Untuk membuat StreamIdentifier
, kami sarankan Anda membuat instance multistream daristreamArn
dan yang tersedia di KCL 2.5.0 ataustreamCreationEpoch
yang lebih baru. Di KCL v2.3 dan v2.4, yang tidak mendukungstreamArm
, buat instance multistream dengan menggunakan format.account-id:StreamName:streamCreationTimestamp
Format ini akan usang dan tidak lagi didukung dimulai dengan rilis utama berikutnya. -
MultistreamTracker juga mencakup strategi untuk menghapus sewa aliran lama di tabel sewa (). formerStreamsLeases DeletionStrategy Perhatikan bahwa strategi CANNOT diubah selama runtime aplikasi konsumen. Untuk informasi lebih lanjut, lihat https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client
.java. src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy
-
Atau Anda dapat menginisialisasi ConfigsBuilder dengan MultiStreamTracker
jika Anda ingin menerapkan aplikasi KCL konsumen yang memproses beberapa aliran secara bersamaan.
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Dengan dukungan multi-stream yang diterapkan untuk aplikasi KCL konsumen Anda, setiap baris tabel sewa aplikasi sekarang berisi ID pecahan dan nama aliran dari beberapa aliran data yang diproses aplikasi ini.
-
Ketika dukungan multi-stream untuk aplikasi KCL konsumen Anda diimplementasikan, akan leaseKey mengambil struktur berikut:
account-id:StreamName:streamCreationTimestamp:ShardId
. Misalnya,111111111:multiStreamTest-1:12345:shardId-000000000336
.
penting
Ketika aplikasi KCL konsumen Anda yang ada dikonfigurasi untuk memproses hanya satu aliran data, leaseKey
(yang merupakan kunci partisi untuk tabel sewa) adalah ID pecahan. Jika Anda mengkonfigurasi ulang aplikasi KCL konsumen yang ada untuk memproses beberapa aliran data, itu merusak tabel sewa Anda, karena leaseKey
strukturnya harus sebagai berikut: account-id:StreamName:StreamCreationTimestamp:ShardId
untuk mendukung multi-aliran.