Apache Hudi mit Apache Flink verwenden - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Apache Hudi mit Apache Flink verwenden

Apache Hudi ist ein Open-Source-Datenmanagement-Framework mit Operationen auf Datensatzebene wie Einfügen, Aktualisieren, Hochsetzen und Löschen, mit denen Sie die Datenverwaltung und die Entwicklung von Datenleitungen vereinfachen können. In Kombination mit effizientem Datenmanagement in Amazon S3 können Sie mit Hudi Daten in Echtzeit aufnehmen und aktualisieren. Hudi verwaltet Metadaten aller Operationen, die Sie mit dem Datensatz ausführen, sodass alle Aktionen atomar und konsistent bleiben.

Apache Hudi ist bei Amazon EMR EKS mit Apache Flink mit EMR Amazon-Versionen 7.2.0 und höher verfügbar. In den folgenden Schritten erfahren Sie, wie Sie loslegen und Apache Hudi-Jobs einreichen können.

In den folgenden Schritten erfahren Sie, wie Sie einen Apache Hudi-Job einreichen.

  1. Erstellen Sie eine AWS Glue-Datenbank mit dem Namendefault.

    aws glue create-database --database-input "{\"Name\":\"default\"}"
  2. Folgen Sie dem Flink SQL Kubernetes-Operator-Beispiel, um die Datei zu erstellen. flink-sql-runner.jar

  3. Erstellen Sie ein SQL Hudi-Skript wie das Folgende.

    CREATE CATALOG hudi_glue_catalog WITH ( 'type' = 'hudi', 'mode' = 'hms', 'table.external' = 'true', 'default-database' = 'default', 'hive.conf.dir' = '/glue/confs/hive/conf/', 'catalog.path' = 's3://<hudi-example-bucket>/FLINK_HUDI/warehouse/' ); USE CATALOG hudi_glue_catalog; CREATE DATABASE IF NOT EXISTS hudi_db; use hudi_db; CREATE TABLE IF NOT EXISTS hudi-flink-example-table( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'path' = 's3://<hudi-example-bucket>/hudi-flink-example-table', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'glue', 'hive_sync.table' = 'hudi-flink-example-table', 'hive_sync.db' = 'hudi_db', 'compaction.delta_commits' = '1', 'hive_sync.partition_fields' = 'partition', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'table.type' = 'COPY_ON_WRITE' ); EXECUTE STATEMENT SET BEGIN INSERT INTO hudi-flink-example-table VALUES ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); END;
  4. Laden Sie Ihr SQL Hudi-Skript und die flink-sql-runner.jar Datei an einen S3-Speicherort hoch.

  5. Stellen Sie in Ihrer FlinkDeployments YAML Datei hudi.enabled auf true ein.

    spec: flinkConfiguration: hudi.enabled: "true"
  6. Erstellen Sie eine YAML Datei, um Ihre Konfiguration auszuführen. Diese Beispieldatei trägt den Namenhudi-write.yaml.

    apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: hudi-write-example spec: flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" hudi.enabled: "true" executionRoleArn: "<JobExecutionRole>" emrReleaseLabel: "emr-7.5.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar args: ["/opt/flink/scripts/hudi-write.sql"] parallelism: 1 upgradeMode: stateless podTemplate: spec: initContainers: - name: flink-sql-script-download args: - s3 - cp - s3://<s3_location>/hudi-write.sql - /flink-scripts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-scripts name: flink-scripts - name: flink-sql-runner-download args: - s3 - cp - s3://<s3_location>/flink-sql-runner.jar - /flink-artifacts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-artifacts name: flink-artifact containers: - name: flink-main-container volumeMounts: - mountPath: /opt/flink/scripts name: flink-scripts - mountPath: /opt/flink/usrlib name: flink-artifact volumes: - emptyDir: {} name: flink-scripts - emptyDir: {} name: flink-artifact
  7. Senden Sie einen Flink Hudi-Job an den Flink Kubernetes-Operator.

    kubectl apply -f hudi-write.yaml