Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.
Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Pertahankan praktik terbaik untuk Layanan Terkelola untuk aplikasi Apache Flink
Bagian ini berisi informasi dan rekomendasi untuk mengembangkan Layanan Terkelola yang stabil dan berkinerja untuk aplikasi Apache Flink.
Topik
- Minimalkan ukuran uber JAR
- Toleransi kesalahan: titik pemeriksaan dan titik simpan
- Versi konektor yang tidak didukung
- Performa dan paralelisme
- Pengaturan paralelisme per operator
- Pencatatan log
- Pengkodean
- Mengelola kredensi
- Membaca dari sumber dengan sedikit pecahan/partisi
- Interval refresh notebook Studio
- Performa optimum notebook Studio
- Bagaimana strategi watermark dan pecahan idle memengaruhi jendela waktu
- Tetapkan a UUID untuk semua operator
- Tambahkan ServiceResourceTransformer ke plugin Maven shade
Minimalkan ukuran uber JAR
Java/Scala application must be packaged in an uber (super/fat) JAR dan sertakan semua dependensi tambahan yang diperlukan yang belum disediakan oleh runtime. Namun, ukuran uber JAR mempengaruhi waktu mulai dan memulai ulang aplikasi dan dapat JAR menyebabkan melebihi batas 512 MB.
Untuk mengoptimalkan waktu penerapan, uber Anda tidak JAR boleh menyertakan yang berikut:
-
Setiap dependensi yang disediakan oleh runtime seperti yang diilustrasikan dalam contoh berikut. Mereka harus memiliki
provided
ruang lingkup dalam POM file ataucompileOnly
dalam konfigurasi Gradle Anda. -
Setiap dependensi yang digunakan untuk pengujian saja, misalnya JUnit atau Mockito. Mereka harus memiliki
test
ruang lingkup dalam POM file atautestImplementation
dalam konfigurasi Gradle Anda. -
Dependensi apa pun yang sebenarnya tidak digunakan oleh aplikasi Anda.
-
Setiap data statis atau metadata yang diperlukan oleh aplikasi Anda. Data statis harus dimuat oleh aplikasi saat runtime, misalnya dari datastore atau dari Amazon S3.
-
Lihat file POM contoh
ini untuk detail tentang pengaturan konfigurasi sebelumnya.
Dependensi yang disediakan
Managed Service for Apache Flink runtime menyediakan sejumlah dependensi. Dependensi ini tidak boleh dimasukkan dalam fat JAR dan harus memiliki provided
ruang lingkup dalam POM file atau secara eksplisit dikecualikan dalam konfigurasi. maven-shade-plugin
Salah satu dependensi ini yang termasuk dalam fat JAR diabaikan saat runtime, tetapi meningkatkan ukuran JAR penambahan overhead selama penerapan.
Dependensi disediakan oleh runtime, dalam runtime versi 1.18, 1.19, dan 1.20:
-
org.apache.flink:flink-core
-
org.apache.flink:flink-java
-
org.apache.flink:flink-streaming-java
-
org.apache.flink:flink-scala_2.12
-
org.apache.flink:flink-table-runtime
-
org.apache.flink:flink-table-planner-loader
-
org.apache.flink:flink-json
-
org.apache.flink:flink-connector-base
-
org.apache.flink:flink-connector-files
-
org.apache.flink:flink-clients
-
org.apache.flink:flink-runtime-web
-
org.apache.flink:flink-metrics-code
-
org.apache.flink:flink-table-api-java
-
org.apache.flink:flink-table-api-bridge-base
-
org.apache.flink:flink-table-api-java-bridge
-
org.apache.logging.log4j:log4j-slf4j-impl
-
org.apache.logging.log4j:log4j-api
-
org.apache.logging.log4j:log4j-core
-
org.apache.logging.log4j:log4j-1.2-api
Selain itucom.amazonaws:aws-kinesisanalytics-runtime:1.2.0
, perpustakaan yang digunakan untuk mengambil properti runtime aplikasi di Managed Service for Apache Flink juga disediakan.
Semua dependensi yang disediakan oleh runtime harus menggunakan rekomendasi berikut untuk tidak memasukkannya ke dalam uber: JAR
-
Di Maven (
pom.xml
) dan SBT (build.sbt
), gunakanprovided
ruang lingkup. -
Di Gradle (
build.gradle
), gunakancompileOnly
konfigurasi.
Ketergantungan apa pun yang disediakan secara tidak sengaja disertakan dalam uber JAR akan diabaikan saat runtime karena pemuatan kelas induk-pertama Apache Flink. Untuk informasi lebih lanjut, lihat parent-first-patterns
Konektor
Sebagian besar konektor, kecuali FileSystem konektor, yang tidak termasuk dalam runtime harus disertakan dalam POM file dengan cakupan default (compile
).
Rekomendasi lainnya
Sebagai aturan, uber Apache Flink Anda yang JAR disediakan untuk Managed Service for Apache Flink harus berisi kode minimum yang diperlukan untuk menjalankan aplikasi. Menyertakan dependensi yang menyertakan kelas sumber, kumpulan data pengujian, atau status bootstrap tidak boleh disertakan dalam toples ini. Jika sumber daya statis perlu ditarik saat runtime, pisahkan masalah ini menjadi sumber daya seperti Amazon S3. Contohnya termasuk bootstraps status atau model inferensi.
Luangkan waktu untuk mempertimbangkan pohon ketergantungan mendalam Anda dan hapus dependensi non-runtime.
Meskipun Managed Service untuk Apache Flink mendukung ukuran jar 512MB, ini harus dilihat sebagai pengecualian aturan. Apache Flink saat ini mendukung ukuran jar ~ 104MB melalui konfigurasi defaultnya, dan itu harus menjadi ukuran target maksimum dari toples yang dibutuhkan.
Toleransi kesalahan: titik pemeriksaan dan titik simpan
Gunakan pos pemeriksaan dan savepoint untuk menerapkan toleransi kesalahan dalam Layanan Terkelola untuk aplikasi Apache Flink Anda. Ingat hal berikut saat mengembangkan dan memelihara aplikasi Anda:
Kami menyarankan agar Anda tetap mengaktifkan checkpointing untuk aplikasi Anda. Checkpointing memberikan toleransi kesalahan untuk aplikasi Anda selama pemeliharaan terjadwal, dan dalam kasus kegagalan tak terduga karena masalah layanan, kegagalan dependensi aplikasi, dan masalah lainnya. Untuk informasi tentang pemeliharaan terjadwal, lihat Mengelola tugas pemeliharaan untuk Managed Service untuk Apache Flink.
Set ApplicationSnapshotConfiguration:: SnapshotsEnabled ke
false
selama pengembangan aplikasi atau pemecahan masalah. Snapshot dibuat selama setiap aplikasi berhenti, yang dapat menyebabkan masalah jika aplikasi dalam keadaan tidak sehat atau tidak berkinerja. AturSnapshotsEnabled
ketrue
setelah aplikasi dalam produksi dan stabil.catatan
Sebaiknya aplikasi Anda membuat snapshot beberapa kali sehari untuk memulai ulang dengan benar menggunakan data status yang benar. Frekuensi yang benar untuk snapshot Anda bergantung pada logika bisnis aplikasi Anda. Sering mengambil snapshot memungkinkan Anda memulihkan data yang lebih baru, tetapi meningkatkan biaya dan membutuhkan lebih banyak sumber daya sistem.
Untuk informasi tentang pemantauan waktu henti aplikasi, lihat .
Untuk informasi selengkapnya tentang penerapan toleransi kegagalan, lihat Menerapkan toleransi kesalahan.
Versi konektor yang tidak didukung
Dari Apache Flink versi 1.15 atau yang lebih baru, Managed Service for Apache Flink secara otomatis mencegah aplikasi memulai atau memperbarui jika mereka menggunakan versi konektor Kinesis yang tidak didukung yang dibundel ke dalam aplikasi. JARs Saat memutakhirkan ke Managed Service untuk Apache Flink versi 1.15 atau yang lebih baru, pastikan Anda menggunakan konektor Kinesis terbaru. Ini adalah versi apa pun yang sama dengan atau lebih baru dari versi 1.15.2. Semua versi lain tidak didukung oleh Managed Service untuk Apache Flink karena mereka dapat menyebabkan masalah konsistensi atau kegagalan dengan fitur Stop with Savepoint, mencegah operasi berhenti/pembaruan bersih. Untuk mempelajari lebih lanjut tentang kompatibilitas konektor di Amazon Managed Service untuk versi Apache Flink, lihat konektor Apache Flink.
Performa dan paralelisme
Aplikasi Anda dapat diskalakan untuk memenuhi tingkat throughput apa pun dengan menyetel paralelisme aplikasi Anda, dan menghindari perangkap performa. Ingat hal berikut saat mengembangkan dan memelihara aplikasi Anda:
Verifikasi bahwa semua sumber aplikasi dan sink Anda ditetapkan dengan cukup dan tidak dibatasi. Jika sumber dan wastafel adalah AWS layanan lain, pantau layanan tersebut menggunakan CloudWatch.
Untuk aplikasi dengan paralelisme yang sangat tinggi, periksa apakah tingkat paralelisme yang tinggi diterapkan pada semua operator dalam aplikasi. Secara default, Apache Flink menerapkan paralelisme aplikasi yang sama untuk semua operator dalam grafik aplikasi. Ini dapat menyebabkan masalah penyediaan pada sumber atau sink, atau pun hambatan dalam pemrosesan data operator. Anda dapat mengubah paralelisme setiap operator dalam kode dengan. setParallelism
Pahami arti pengaturan paralelisme untuk operatori dalam aplikasi Anda. Jika Anda mengubah paralelisme untuk operator, Anda mungkin tidak dapat memulihkan aplikasi dari snapshot yang dibuat ketika operator memiliki paralelisme yang tidak kompatibel dengan pengaturan saat ini. Untuk informasi selengkapnya tentang pengaturan paralelisme operator, lihat Mengatur paralelisme maksimum untuk operator secara eksplisit
.
Untuk informasi selengkapnya tentang penerapan penskalaan, lihat Menerapkan penskalaan aplikasi.
Pengaturan paralelisme per operator
Secara default, semua operator memiliki paralelisme yang ditetapkan pada tingkat aplikasi. Anda dapat mengganti paralelisme dari satu operator menggunakan using. DataStream API .setParallelism(x)
Anda dapat mengatur paralelisme operator ke paralelisme apa pun yang sama atau lebih rendah dari paralelisme aplikasi.
Jika memungkinkan, tentukan paralelisme operator sebagai fungsi dari paralelisme aplikasi. Dengan cara ini, paralelisme operator akan bervariasi dengan paralelisme aplikasi. Jika Anda menggunakan penskalaan otomatis, misalnya, semua operator akan memvariasikan paralelisme mereka dalam proporsi yang sama:
int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);
Dalam beberapa kasus, Anda mungkin ingin mengatur paralelisme operator ke konstanta. Misalnya, mengatur paralelisme sumber Aliran Kinesis ke jumlah pecahan. Dalam kasus ini, Anda harus mempertimbangkan untuk meneruskan paralelisme operator sebagai parameter konfigurasi aplikasi, untuk mengubahnya tanpa mengubah kode, jika Anda perlu, misalnya, untuk mengubah aliran sumber.
Pencatatan log
Anda dapat memantau kinerja dan kondisi kesalahan aplikasi Anda menggunakan CloudWatch Log. Ingat hal berikut saat mengonfigurasi pencatatan untuk aplikasi Anda:
Aktifkan CloudWatch pencatatan untuk aplikasi sehingga masalah runtime apa pun dapat di-debug.
Jangan buat entri log untuk setiap catatan yang diproses dalam aplikasi. Ini menyebabkan hambatan parah selama pemrosesan dan dapat menyebabkan tekanan balik dalam pemrosesan data.
Buat CloudWatch alarm untuk memberi tahu Anda ketika aplikasi Anda tidak berjalan dengan benar. Untuk informasi selengkapnya, lihat
Untuk informasi selengkapnya tentang penerapan pencatatan, lihat .
Pengkodean
Anda dapat membuat aplikasi Anda berfungsi dan stabil menggunakan praktik pemrograman yang direkomendasikan. Ingat hal berikut saat menulis kode aplikasi:
Jangan gunakan
system.exit()
dalam kode aplikasi Anda, baik dalam metodemain
aplikasi Anda atau dalam fungsi yang ditetapkan pengguna. Jika Anda ingin menonaktifkan aplikasi Anda dari dalam kode, lempar pengecualian yang berasal dariException
atauRuntimeException
, yang berisi pesan tentang apa yang salah dengan aplikasi.Catat hal berikut tentang bagaimana layanan menangani pengecualian ini:
Jika pengecualian dilemparkan dari metode
main
aplikasi Anda, layanan akan membungkusnya dalamProgramInvocationException
saat transisi aplikasi ke statusRUNNING
, dan manajer tugas akan gagal mengirimkan tugas.Jika pengecualian dilemparkan dari fungsi yang ditetapkan pengguna, manajer tugas akan gagal tugas dan memulai ulang, serta detail pengecualian akan ditulis ke log pengecualian.
Pertimbangkan untuk menaungi JAR file aplikasi Anda dan dependensi yang disertakan. Bayangan direkomendasikan ketika ada potensi konflik dalam nama paket antara aplikasi Anda dan runtime Apache Flink. Jika terjadi konflik, log aplikasi Anda mungkin berisi pengecualian tipe
java.util.concurrent.ExecutionException
. Untuk informasi selengkapnya tentang shading JAR file aplikasi Anda, lihat Apache MavenShade Plugin.
Mengelola kredensi
Anda tidak boleh memanggang kredensi jangka panjang apa pun ke dalam aplikasi produksi (atau lainnya). Kredensi jangka panjang kemungkinan diperiksa ke dalam sistem kontrol versi dan dapat dengan mudah hilang. Sebagai gantinya, Anda dapat mengaitkan peran ke Layanan Terkelola untuk aplikasi Apache Flink dan memberikan hak istimewa untuk peran tersebut. Aplikasi Flink yang sedang berjalan kemudian dapat mengambil kredensil sementara dengan hak istimewa masing-masing dari lingkungan. Jika otentikasi diperlukan untuk layanan yang tidak terintegrasi secara native denganIAM, misalnya, database yang memerlukan nama pengguna dan kata sandi untuk otentikasi, Anda harus mempertimbangkan untuk menyimpan rahasia di Secrets Manager AWS .
Banyak layanan AWS asli mendukung otentikasi:
Amazon MSK — https://github.com/aws/aws-msk-iam-auth/# using-the-amazon-msk
- library-for-iam-authentication Amazon Elasticsearch Service — .java AmazonElasticsearchSink
Amazon S3 - bekerja di luar kotak pada Layanan Terkelola untuk Apache Flink
Membaca dari sumber dengan sedikit pecahan/partisi
Saat membaca dari Apache Kafka atau Aliran Data Kinesis, mungkin ada ketidakcocokan antara paralelisme aliran (yaitu, jumlah partisi untuk Kafka dan jumlah pecahan untuk Kinesis) dan paralelisme aplikasi. Dengan desain yang naif, paralelisme aplikasi tidak dapat berskala melampaui paralelisme aliran: Setiap subtugas operator sumber hanya dapat membaca dari 1 atau lebih piringan/partisi. Itu berarti untuk aliran dengan hanya 2 pecahan dan aplikasi dengan paralelisme 8, bahwa hanya dua subtugas yang benar-benar memakan dari aliran dan 6 subtugas tetap menganggur. Ini secara substansional dapat membatasi throughput aplikasi, khususnya jika deserialisasi mahal dan dilakukan oleh sumber (yang merupakan default).
Untuk mengurangi efek ini, Anda dapat menskalakan aliran. Tapi itu mungkin tidak selalu diinginkan atau mungkin. Atau, Anda dapat merestrukturisasi sumber sehingga tidak melakukan serialisasi apa pun dan hanya meneruskan. byte[]
Anda kemudian dapat menyeimbangkan kembali
Interval refresh notebook Studio
Jika Anda mengubah interval refresh hasil paragraf, atur ke nilai yang setidaknya 1000 milidetik.
Performa optimum notebook Studio
Kami menguji dengan pernyataan berikut dan mendapat performa terbaik saat events-per-second
yang dikalikan dengan number-of-keys
berada di bawah 25.000.000. Ini adalah untuk events-per-second
di bawah 150.000.
SELECT key, sum(value) FROM key-values GROUP BY key
Bagaimana strategi watermark dan pecahan idle memengaruhi jendela waktu
Saat membaca peristiwa dari Apache Kafka dan Kinesis Data Streams, sumber dapat mengatur waktu acara berdasarkan atribut aliran. Dalam kasus Kinesis, waktu acara sama dengan perkiraan waktu kedatangan peristiwa. Tetapi pengaturan waktu acara di sumber untuk acara tidak cukup bagi aplikasi Flink untuk menggunakan waktu acara. Sumber juga harus menghasilkan tanda air yang menyebarkan informasi tentang waktu acara dari sumber ke semua operator lain. Dokumentasi Flink
Secara default, stempel waktu peristiwa yang dibaca dari Kinesis diatur ke perkiraan waktu kedatangan yang ditentukan oleh Kinesis. Prasyarat tambahan untuk waktu acara untuk bekerja dalam aplikasi adalah strategi watermark.
WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));
Strategi watermark kemudian diterapkan ke a DataStream
with the assignTimestampsAndWatermarks
method. Ada beberapa strategi build-in yang berguna:
forMonotonousTimestamps()
hanya akan menggunakan waktu acara (perkiraan waktu kedatangan) dan secara berkala memancarkan nilai maksimum sebagai tanda air (untuk setiap subtugas tertentu)forBoundedOutOfOrderness(Duration.ofSeconds(...))
mirip dengan strategi sebelumnya, tetapi akan menggunakan waktu acara - durasi untuk pembuatan tanda air.
Ini berhasil, tetapi ada beberapa peringatan yang harus diperhatikan. Tanda air dihasilkan pada tingkat subtugas dan mengalir melalui grafik operator.
Dari dokumentasi Flink
Setiap subtugas paralel dari fungsi sumber biasanya menghasilkan tanda airnya secara independen. Tanda air ini menentukan waktu acara pada sumber paralel tertentu.
Saat tanda air mengalir melalui program streaming, mereka memajukan waktu acara di operator tempat mereka tiba. Setiap kali operator memajukan waktu acaranya, ia menghasilkan tanda air baru di hilir untuk operator penggantinya.
Beberapa operator menggunakan beberapa aliran input; serikat pekerja, misalnya, atau operator yang mengikuti fungsi keyBy (...) atau partisi (...). Waktu kejadian operator saat ini adalah minimum waktu acara aliran inputnya. Karena aliran inputnya memperbarui waktu acara mereka, begitu juga operator.
Itu berarti, jika subtugas sumber mengkonsumsi dari pecahan siaga, operator hilir tidak menerima tanda air baru dari subtugas itu dan karenanya memproses stall untuk semua operator hilir yang menggunakan jendela waktu. Untuk menghindari hal ini, pelanggan dapat menambahkan withIdleness
opsi ke strategi tanda air. Dengan opsi itu, operator mengecualikan tanda air dari subtugas upsteam idle saat menghitung waktu acara operator. Subtugas idle karenanya tidak lagi memblokir kemajuan waktu acara di operator hilir.
Namun, opsi kemalasan dengan strategi tanda air bawaan tidak akan memajukan waktu acara jika tidak ada subtugas yang membaca acara apa pun, yaitu, tidak ada acara dalam aliran. Ini menjadi sangat terlihat untuk kasus uji di mana serangkaian peristiwa terbatas dibaca dari aliran. Karena waktu acara tidak berlanjut setelah acara terakhir dibaca, jendela terakhir (berisi acara terakhir) tidak akan pernah ditutup.
Ringkasan
withIdleness
pengaturan tidak akan menghasilkan tanda air baru jika pecahan menganggur, itu hanya akan mengecualikan tanda air terakhir yang dikirim oleh subtugas idle dari perhitungan tanda air min di operator hilirdengan strategi tanda air bawaan, jendela terbuka terakhir tidak akan pernah ditutup (kecuali acara baru yang memajukan tanda air akan dikirim, tetapi itu menciptakan jendela baru yang kemudian tetap terbuka)
bahkan ketika waktu diatur oleh aliran Kinesis, peristiwa kedatangan terlambat masih dapat terjadi jika satu pecahan dikonsumsi lebih cepat daripada yang lain (misalnya, selama inisialisasi aplikasi atau saat menggunakan
TRIM_HORIZON
di mana semua pecahan yang ada dikonsumsi secara paralel mengabaikan hubungan orang tua/anak mereka)withIdleness
pengaturan strategi tanda air tampaknya menghentikan pengaturan khusus sumber Kinesis untuk pecahan siaga(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS
Contoh
Aplikasi berikut membaca dari aliran dan membuat jendela sesi berdasarkan waktu acara.
Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });
Dalam contoh berikut, 8 peristiwa ditulis ke aliran pecahan 16 (2 yang pertama dan peristiwa terakhir kebetulan mendarat di pecahan yang sama).
$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022
Masukan ini akan menghasilkan jendela sesi 5: event 1,2,3; event 4,5; event 6; event 7; event 8. Namun, program ini hanya menghasilkan 4 jendela pertama.
11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7
Outputnya hanya menampilkan 4 jendela (tidak ada jendela terakhir yang berisi acara 8). Ini karena waktu acara dan strategi tanda air. Jendela terakhir tidak dapat ditutup karena dengan strategi watermark per built waktu tidak pernah maju melampaui waktu peristiwa terakhir yang telah dibaca dari aliran. Tetapi agar jendela ditutup, waktu perlu maju lebih dari 10 detik setelah acara terakhir. Dalam hal ini tanda air terakhir adalah 2022-03-23T 10:21:27.170 Z tetapi agar jendela sesi ditutup, diperlukan tanda air 10 detik dan 1 ms kemudian.
Jika withIdleness
opsi dihapus dari strategi tanda air, tidak ada jendela sesi yang akan ditutup, karena “tanda air global” dari operator jendela tidak dapat maju.
Perhatikan bahwa ketika aplikasi Flink dimulai (atau jika ada kemiringan data), beberapa pecahan dapat dikonsumsi lebih cepat daripada yang lain. Hal ini dapat menyebabkan beberapa tanda air dipancarkan terlalu dini dari subtugas (subtugas dapat memancarkan tanda air berdasarkan konten satu pecahan tanpa dikonsumsi dari pecahan lain yang dilangganannya). Cara untuk mengurangi adalah strategi watermarking yang berbeda yang menambahkan buffer keamanan (forBoundedOutOfOrderness(Duration.ofSeconds(30))
atau secara eksplisit memungkinkan acara kedatangan terlambat. (allowedLateness(Time.minutes(5))
Tetapkan a UUID untuk semua operator
Ketika Layanan Terkelola untuk Apache Flink memulai pekerjaan Flink untuk aplikasi dengan snapshot, pekerjaan Flink dapat gagal dimulai karena masalah tertentu. Salah satunya adalah ketidakcocokan ID operator. Flink mengharapkan operator eksplisit dan konsisten IDs untuk operator grafik pekerjaan Flink. Jika tidak disetel secara eksplisit, Flink otomatis membuat ID untuk operator. Ini karena Flink menggunakan operator ini IDs untuk mengidentifikasi operator secara unik dalam grafik pekerjaan dan menggunakannya untuk menyimpan status setiap operator di savepoint.
Masalah ketidakcocokan ID operator terjadi ketika Flink tidak menemukan pemetaan 1:1 antara operator grafik pekerjaan dan operator IDs yang IDs ditentukan dalam savepoint. Ini terjadi ketika operator konsisten eksplisit tidak IDs disetel dan Flink otomatis menghasilkan operator IDs yang mungkin tidak konsisten dengan setiap pembuatan grafik pekerjaan. Kemungkinan aplikasi mengalami masalah ini tinggi selama pemeliharaan berjalan. Untuk menghindari hal ini, kami menyarankan pelanggan mengatur UUID semua operator dalam kode flink. Untuk informasi selengkapnya, lihat topik Menetapkan UUID untuk semua operator di bawah Kesiapan produksi.
Tambahkan ServiceResourceTransformer ke plugin Maven shade
Flink menggunakan Java Service Provider Interfaces (SPI)
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>