Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Menggunakan ketersediaan tinggi (HA) untuk Operator Flink dan Aplikasi Flink
Topik ini menunjukkan cara mengonfigurasi ketersediaan tinggi dan menjelaskan cara kerjanya untuk beberapa kasus penggunaan yang berbeda. Ini termasuk ketika Anda menggunakan Job manager dan ketika Anda menggunakan kubernetes asli Flink.
Operator Flink ketersediaan tinggi
Kami mengaktifkan ketersediaan tinggi untuk Operator Flink sehingga kami dapat gagal ke Operator Flink siaga untuk meminimalkan waktu henti di loop kontrol operator jika terjadi kegagalan. Ketersediaan tinggi diaktifkan secara default dan jumlah default replika operator awal adalah 2. Anda dapat mengonfigurasi bidang replika di values.yaml
file Anda untuk bagan helm.
Bidang berikut dapat disesuaikan:
-
replicas
(opsional, defaultnya adalah 2): Menyetel nomor ini menjadi lebih besar dari 1 membuat Operator siaga lainnya dan memungkinkan pemulihan pekerjaan Anda lebih cepat. -
highAvailabilityEnabled
(opsional, defaultnya benar): Mengontrol apakah Anda ingin mengaktifkan HA. Menentukan parameter ini sebagai true memungkinkan dukungan penyebaran multi AZ, serta menetapkan parameter yang benarflink-conf.yaml
.
Anda dapat menonaktifkan HA untuk operator Anda dengan mengatur konfigurasi berikut di values.yaml
file Anda.
... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...
Penyebaran multi AZ
Kami membuat pod operator di beberapa Availability Zone. Ini adalah kendala lunak, dan pod operator Anda akan dijadwalkan di AZ yang sama jika Anda tidak memiliki cukup sumber daya di AZ yang berbeda.
Menentukan replika pemimpin
Jika HA diaktifkan, replika menggunakan sewa untuk menentukan pemimpin mana dan menggunakan Sewa K8 untuk pemilihan pemimpin. JMs Anda dapat menjelaskan Sewa dan melihat bidang Identitas .Spec.Holder untuk menentukan pemimpin saat ini
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
Interaksi Flink-S3
Mengkonfigurasi kredenal akses
Pastikan bahwa Anda telah mengonfigurasi IRSA dengan IAM izin yang sesuai untuk mengakses bucket S3.
Mengambil stoples pekerjaan dari mode Aplikasi S3
Operator Flink juga mendukung pengambilan stoples aplikasi dari S3. Anda hanya menyediakan lokasi S3 untuk toples URI dalam FlinkDeployment spesifikasi Anda.
Anda juga dapat menggunakan fitur ini untuk mengunduh artefak lain seperti PyFlink skrip. Skrip Python yang dihasilkan dijatuhkan di bawah jalur. /opt/flink/usrlib/
Contoh berikut menunjukkan bagaimana menggunakan fitur ini untuk PyFlink pekerjaan. Perhatikan bidang toples URI dan args.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless
Konektor Flink S3
Flink dikemas dengan dua konektor S3 (tercantum di bawah). Bagian berikut membahas kapan harus menggunakan konektor mana.
Checkpointing: Konektor Presto S3
-
Setel skema S3 ke s3p://
-
Konektor yang disarankan untuk digunakan ke pos pemeriksaan ke s3. Untuk informasi selengkapnya, lihat khusus S3 dalam dokumentasi
Apache Flink.
Contoh FlinkDeployment spesifikasi:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
Membaca dan menulis ke S3: Konektor Hadoop S3
-
Atur skema S3 ke
s3://
atau ()s3a://
-
Konektor yang direkomendasikan untuk membaca dan menulis file dari S3 (hanya konektor S3 yang mengimplementasikan antarmuka Flinks
Filesystem). -
Secara default, kami mengatur
fs.s3a.aws.credentials.provider
dalamflink-conf.yaml
file, yaitucom.amazonaws.auth.WebIdentityTokenCredentialsProvider
. Jika Anda mengganti d efaultflink-conf
sepenuhnya dan Anda berinteraksi dengan S3, pastikan untuk menggunakan penyedia ini.
Contoh FlinkDeployment spesifikasi
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless
Manajer Pekerjaan Flink
Ketersediaan Tinggi (HA) untuk Penerapan Flink memungkinkan pekerjaan terus membuat kemajuan bahkan jika kesalahan sementara ditemukan dan crash Anda. JobManager Pekerjaan akan dimulai ulang tetapi dari pos pemeriksaan terakhir yang berhasil dengan HA diaktifkan. Tanpa HA diaktifkan, Kubernetes akan memulai ulang pekerjaan Anda JobManager, tetapi pekerjaan Anda akan dimulai sebagai pekerjaan baru dan akan kehilangan kemajuannya. Setelah mengonfigurasi HA, kami dapat memberi tahu Kubernetes untuk menyimpan metadata HA dalam penyimpanan persisten untuk referensi jika terjadi kegagalan sementara di JobManager dan kemudian melanjutkan pekerjaan kami dari pos pemeriksaan terakhir yang berhasil.
HA diaktifkan secara default untuk pekerjaan Flink Anda (jumlah replika disetel ke 2, yang mengharuskan Anda menyediakan lokasi penyimpanan S3 agar metadata HA tetap ada).
Konfigurasi HA
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1
Berikut ini adalah deskripsi untuk konfigurasi HA di atas di Job Manager (didefinisikan di bawah.spec. jobManager):
-
highAvailabilityEnabled
(opsional, default adalah true): Setel ini kefalse
jika Anda tidak ingin HA diaktifkan dan tidak ingin menggunakan konfigurasi HA yang disediakan. Anda masih dapat memanipulasi bidang “replika” untuk mengonfigurasi HA secara manual. -
replicas
(opsional, defaultnya adalah 2): Menyetel nomor ini menjadi lebih besar dari 1 membuat siaga lainnya JobManagers dan memungkinkan pemulihan pekerjaan Anda lebih cepat. Jika Anda menonaktifkan HA, Anda harus mengatur jumlah replika ke 1, atau Anda akan terus mendapatkan kesalahan validasi (hanya 1 replika yang didukung jika HA tidak diaktifkan). -
storageDir
(wajib): Karena kami menggunakan jumlah replika sebagai 2 secara default, kami harus menyediakan persistenstorageDir. Saat ini bidang ini hanya menerima jalur S3 sebagai lokasi penyimpanan.
Lokalitas pod
Jika Anda mengaktifkan HA, kami juga mencoba mengkolokasi pod di AZ yang sama, yang mengarah pada peningkatan kinerja (mengurangi latensi jaringan dengan memiliki pod yang samaAZs). Ini adalah proses upaya terbaik, artinya jika Anda tidak memiliki cukup sumber daya di AZ di mana sebagian besar Pod Anda dijadwalkan, Pod yang tersisa masih akan dijadwalkan tetapi mungkin berakhir pada node di luar AZ ini.
Menentukan replika pemimpin
Jika HA diaktifkan, replika menggunakan sewa untuk menentukan pemimpin mana dan menggunakan Configmap K8s sebagai datastore untuk menyimpan metadata ini. JMs Jika Anda ingin menentukan pemimpin, Anda dapat melihat konten Configmap dan melihat kunci di org.apache.flink.k8s.leader.restserver
bawah data untuk menemukan pod K8s dengan alamat IP. Anda juga dapat menggunakan perintah bash berikut.
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n
NAMESPACE
-o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
Pekerjaan Flink - Kubernetes asli
Amazon EMR 6.13.0 dan yang lebih tinggi mendukung Kubernetes asli Flink untuk menjalankan aplikasi Flink dalam mode ketersediaan tinggi di klaster Amazon. EKS
catatan
Anda harus memiliki bucket Amazon S3 yang dibuat untuk menyimpan metadata ketersediaan tinggi saat mengirimkan pekerjaan Flink Anda. Jika Anda tidak ingin menggunakan fitur ini, Anda dapat menonaktifkannya. Ini diaktifkan secara default.
Untuk mengaktifkan fitur ketersediaan tinggi Flink, berikan parameter Flink berikut saat Anda menjalankan perintah. run-application
CLI Parameter didefinisikan di bawah contoh.
-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=
S3://DOC-EXAMPLE-STORAGE-BUCKET
\ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
-
Dhigh-availability.storageDir
- Bucket Amazon S3 tempat Anda ingin menyimpan metadata ketersediaan tinggi untuk pekerjaan Anda.Dkubernetes.jobmanager.replicas
— Jumlah pod Job Manager yang akan dibuat sebagai bilangan bulat lebih besar dari1
.Dkubernetes.cluster-id
— ID unik yang mengidentifikasi cluster Flink.