Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Menggunakan Apache Hudi dengan Apache Flink
Apache Hudi adalah kerangka kerja manajemen data sumber terbuka dengan operasi tingkat rekaman seperti menyisipkan, memperbarui, meningkatkan, dan menghapus yang dapat Anda gunakan untuk menyederhanakan manajemen data dan pengembangan pipa data. Ketika dikombinasikan dengan manajemen data yang efisien di Amazon S3, Hudi memungkinkan Anda menelan dan memperbarui data secara real time. Hudi mempertahankan metadata dari semua operasi yang Anda jalankan pada dataset, sehingga semua tindakan tetap atom dan konsisten.
Apache Hudi tersedia di Amazon EKS dengan Apache Flink dengan EMR EMR Amazon merilis 7.2.0 dan lebih tinggi. Lihat langkah-langkah berikut untuk mempelajari cara memulai dan mengirimkan pekerjaan Apache Hudi.
Kirim pekerjaan Apache Hudi
Lihat langkah-langkah berikut untuk mempelajari cara mengirimkan pekerjaan Apache Hudi.
-
Buat database AWS Glue bernama
default
.aws glue create-database --database-input "{\"Name\":\"default\"}"
-
Ikuti SQLContoh Operator Flink Kubernetes
untuk membangun file. flink-sql-runner.jar
-
Buat SQL skrip Hudi seperti berikut ini.
CREATE CATALOG hudi_glue_catalog WITH ( 'type' = 'hudi', 'mode' = 'hms', 'table.external' = 'true', 'default-database' = 'default', 'hive.conf.dir' = '/glue/confs/hive/conf/', 'catalog.path' = 's3://
<hudi-example-bucket>
/FLINK_HUDI/warehouse/' ); USE CATALOG hudi_glue_catalog; CREATE DATABASE IF NOT EXISTS hudi_db; use hudi_db; CREATE TABLE IF NOT EXISTS hudi-flink-example-table( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'path' = 's3://<hudi-example-bucket>
/hudi-flink-example-table', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'glue', 'hive_sync.table' = 'hudi-flink-example-table', 'hive_sync.db' = 'hudi_db', 'compaction.delta_commits' = '1', 'hive_sync.partition_fields' = 'partition', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'table.type' = 'COPY_ON_WRITE' ); EXECUTE STATEMENT SET BEGIN INSERT INTO hudi-flink-example-table VALUES ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); END; -
Unggah SQL skrip Hudi Anda dan
flink-sql-runner.jar
file ke lokasi S3. -
Dalam
FlinkDeployments
YAML file Anda, aturhudi.enabled
ketrue
.spec: flinkConfiguration: hudi.enabled: "true"
-
Buat YAML file untuk menjalankan konfigurasi Anda. File contoh ini diberi nama
hudi-write.yaml
.apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: hudi-write-example spec: flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" hudi.enabled: "true" executionRoleArn: "
<JobExecutionRole>
" emrReleaseLabel: "emr-7.3.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar args: ["/opt/flink/scripts/hudi-write.sql"] parallelism: 1 upgradeMode: stateless podTemplate: spec: initContainers: - name: flink-sql-script-download args: - s3 - cp - s3://<s3_location>
/hudi-write.sql - /flink-scripts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-scripts name: flink-scripts - name: flink-sql-runner-download args: - s3 - cp - s3://<s3_location>
/flink-sql-runner.jar - /flink-artifacts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-artifacts name: flink-artifact containers: - name: flink-main-container volumeMounts: - mountPath: /opt/flink/scripts name: flink-scripts - mountPath: /opt/flink/usrlib name: flink-artifact volumes: - emptyDir: {} name: flink-scripts - emptyDir: {} name: flink-artifact -
Kirim pekerjaan Flink Hudi ke operator Flink Kubernetes.
kubectl apply -f hudi-write.yaml