Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Apache Hudi mit Apache Flink verwenden
Apache Hudi ist ein Open-Source-Datenmanagement-Framework mit Operationen auf Datensatzebene wie Einfügen, Aktualisieren, Hochsetzen und Löschen, mit denen Sie die Datenverwaltung und die Entwicklung von Datenleitungen vereinfachen können. In Kombination mit effizientem Datenmanagement in Amazon S3 können Sie mit Hudi Daten in Echtzeit aufnehmen und aktualisieren. Hudi verwaltet Metadaten aller Operationen, die Sie mit dem Datensatz ausführen, sodass alle Aktionen atomar und konsistent bleiben.
Apache Hudi ist bei Amazon EMR EKS mit Apache Flink mit EMR Amazon-Versionen 7.2.0 und höher verfügbar. In den folgenden Schritten erfahren Sie, wie Sie loslegen und Apache Hudi-Jobs einreichen können.
Reichen Sie einen Apache Hudi-Job ein
In den folgenden Schritten erfahren Sie, wie Sie einen Apache Hudi-Job einreichen.
-
Erstellen Sie eine AWS Glue-Datenbank mit dem Namen
default
.aws glue create-database --database-input "{\"Name\":\"default\"}"
-
Folgen Sie dem Flink SQL Kubernetes-Operator-Beispiel, um die Datei
zu erstellen. flink-sql-runner.jar
-
Erstellen Sie ein SQL Hudi-Skript wie das Folgende.
CREATE CATALOG hudi_glue_catalog WITH ( 'type' = 'hudi', 'mode' = 'hms', 'table.external' = 'true', 'default-database' = 'default', 'hive.conf.dir' = '/glue/confs/hive/conf/', 'catalog.path' = 's3://
<hudi-example-bucket>
/FLINK_HUDI/warehouse/' ); USE CATALOG hudi_glue_catalog; CREATE DATABASE IF NOT EXISTS hudi_db; use hudi_db; CREATE TABLE IF NOT EXISTS hudi-flink-example-table( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'path' = 's3://<hudi-example-bucket>
/hudi-flink-example-table', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'glue', 'hive_sync.table' = 'hudi-flink-example-table', 'hive_sync.db' = 'hudi_db', 'compaction.delta_commits' = '1', 'hive_sync.partition_fields' = 'partition', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'table.type' = 'COPY_ON_WRITE' ); EXECUTE STATEMENT SET BEGIN INSERT INTO hudi-flink-example-table VALUES ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); END; -
Laden Sie Ihr SQL Hudi-Skript und die
flink-sql-runner.jar
Datei an einen S3-Speicherort hoch. -
Stellen Sie in Ihrer
FlinkDeployments
YAML Dateihudi.enabled
auftrue
ein.spec: flinkConfiguration: hudi.enabled: "true"
-
Erstellen Sie eine YAML Datei, um Ihre Konfiguration auszuführen. Diese Beispieldatei trägt den Namen
hudi-write.yaml
.apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: hudi-write-example spec: flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" hudi.enabled: "true" executionRoleArn: "
<JobExecutionRole>
" emrReleaseLabel: "emr-7.5.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar args: ["/opt/flink/scripts/hudi-write.sql"] parallelism: 1 upgradeMode: stateless podTemplate: spec: initContainers: - name: flink-sql-script-download args: - s3 - cp - s3://<s3_location>
/hudi-write.sql - /flink-scripts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-scripts name: flink-scripts - name: flink-sql-runner-download args: - s3 - cp - s3://<s3_location>
/flink-sql-runner.jar - /flink-artifacts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-artifacts name: flink-artifact containers: - name: flink-main-container volumeMounts: - mountPath: /opt/flink/scripts name: flink-scripts - mountPath: /opt/flink/usrlib name: flink-artifact volumes: - emptyDir: {} name: flink-scripts - emptyDir: {} name: flink-artifact -
Senden Sie einen Flink Hudi-Job an den Flink Kubernetes-Operator.
kubectl apply -f hudi-write.yaml