Utilisation d'Apache Hudi avec Apache Flink - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation d'Apache Hudi avec Apache Flink

Apache Hudi est un framework de gestion de données open source avec des opérations de niveau enregistrement telles que l'insertion, la mise à jour, la modification et la suppression, que vous pouvez utiliser pour simplifier la gestion des données et le développement de pipelines de données. Associé à une gestion efficace des données dans Amazon S3, Hudi vous permet d'ingérer et de mettre à jour des données en temps réel. Hudi gère les métadonnées de toutes les opérations que vous exécutez sur le jeu de données, afin que toutes les actions restent atomiques et cohérentes.

Apache Hudi est disponible EMR sur Amazon EKS avec Apache Flink avec les EMR versions 7.2.0 et supérieures d'Amazon. Consultez les étapes suivantes pour savoir comment démarrer et soumettre des tâches Apache Hudi.

Consultez les étapes suivantes pour savoir comment soumettre une tâche Apache Hudi.

  1. Créez une base AWS de données Glue nomméedefault.

    aws glue create-database --database-input "{\"Name\":\"default\"}"
  2. Suivez l'SQLexemple de l'opérateur Flink Kubernetes pour créer le fichier. flink-sql-runner.jar

  3. Créez un SQL script Hudi comme suit.

    CREATE CATALOG hudi_glue_catalog WITH ( 'type' = 'hudi', 'mode' = 'hms', 'table.external' = 'true', 'default-database' = 'default', 'hive.conf.dir' = '/glue/confs/hive/conf/', 'catalog.path' = 's3://<hudi-example-bucket>/FLINK_HUDI/warehouse/' ); USE CATALOG hudi_glue_catalog; CREATE DATABASE IF NOT EXISTS hudi_db; use hudi_db; CREATE TABLE IF NOT EXISTS hudi-flink-example-table( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'path' = 's3://<hudi-example-bucket>/hudi-flink-example-table', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'glue', 'hive_sync.table' = 'hudi-flink-example-table', 'hive_sync.db' = 'hudi_db', 'compaction.delta_commits' = '1', 'hive_sync.partition_fields' = 'partition', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'table.type' = 'COPY_ON_WRITE' ); EXECUTE STATEMENT SET BEGIN INSERT INTO hudi-flink-example-table VALUES ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); END;
  4. Téléchargez votre SQL script Hudi et le flink-sql-runner.jar fichier dans un emplacement S3.

  5. Dans votre FlinkDeployments YAML fichier, définissez hudi.enabled surtrue.

    spec: flinkConfiguration: hudi.enabled: "true"
  6. Créez un YAML fichier pour exécuter votre configuration. Ce fichier d'exemple est nomméhudi-write.yaml.

    apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: hudi-write-example spec: flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" hudi.enabled: "true" executionRoleArn: "<JobExecutionRole>" emrReleaseLabel: "emr-7.5.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar args: ["/opt/flink/scripts/hudi-write.sql"] parallelism: 1 upgradeMode: stateless podTemplate: spec: initContainers: - name: flink-sql-script-download args: - s3 - cp - s3://<s3_location>/hudi-write.sql - /flink-scripts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-scripts name: flink-scripts - name: flink-sql-runner-download args: - s3 - cp - s3://<s3_location>/flink-sql-runner.jar - /flink-artifacts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-artifacts name: flink-artifact containers: - name: flink-main-container volumeMounts: - mountPath: /opt/flink/scripts name: flink-scripts - mountPath: /opt/flink/usrlib name: flink-artifact volumes: - emptyDir: {} name: flink-scripts - emptyDir: {} name: flink-artifact
  7. Soumettez une tâche Flink Hudi à l'opérateur Flink Kubernetes.

    kubectl apply -f hudi-write.yaml