Uso de Apache Hudi con Apache Flink - Amazon EMR

Uso de Apache Hudi con Apache Flink

Apache Hudi es un marco de administración de datos de código abierto con operaciones a nivel de registro, como inserción, actualización, upsert y eliminación, que puede utilizar para simplificar la administración de los datos y el desarrollo de canalizaciones de datos. Cuando se combina con una administración de datos eficiente en Amazon S3, Hudi le permite incorporar y actualizar datos en tiempo real. Hudi mantiene los metadatos de todas las operaciones que ejecuta en el conjunto de datos, de modo que todas las acciones siguen siendo atómicas y coherentes.

Apache Hudi está disponible en Amazon EMR en EKS con Apache Flink con las versiones 7.2.0 y posteriores de Amazon EMR. Consulte los pasos siguientes para aprender cómo empezar a enviar trabajos de Apache Hudi.

Consulte los pasos siguientes para aprender cómo enviar un trabajo de Apache Hudi.

  1. Cree una base de datos de Glue de AWS llamada default.

    aws glue create-database --database-input "{\"Name\":\"default\"}"
  2. Siga el Ejemplo de SQL del operador de Kubernetes de Flink para crear el archivo flink-sql-runner.jar.

  3. Cree un script SQL de Hudi como el siguiente.

    CREATE CATALOG hudi_glue_catalog WITH ( 'type' = 'hudi', 'mode' = 'hms', 'table.external' = 'true', 'default-database' = 'default', 'hive.conf.dir' = '/glue/confs/hive/conf/', 'catalog.path' = 's3://<hudi-example-bucket>/FLINK_HUDI/warehouse/' ); USE CATALOG hudi_glue_catalog; CREATE DATABASE IF NOT EXISTS hudi_db; use hudi_db; CREATE TABLE IF NOT EXISTS hudi-flink-example-table( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'path' = 's3://<hudi-example-bucket>/hudi-flink-example-table', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'glue', 'hive_sync.table' = 'hudi-flink-example-table', 'hive_sync.db' = 'hudi_db', 'compaction.delta_commits' = '1', 'hive_sync.partition_fields' = 'partition', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'table.type' = 'COPY_ON_WRITE' ); EXECUTE STATEMENT SET BEGIN INSERT INTO hudi-flink-example-table VALUES ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); END;
  4. Cargue su script SQL de Hudi y el archivo flink-sql-runner.jar en una ubicación de S3.

  5. En su archivo YAML FlinkDeployments, establezca hudi.enabled en true.

    spec: flinkConfiguration: hudi.enabled: "true"
  6. Cree un archivo YAML para ejecutar su configuración. Este archivo de ejemplo se denomina hudi-write.yaml.

    apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: hudi-write-example spec: flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" hudi.enabled: "true" executionRoleArn: "<JobExecutionRole>" emrReleaseLabel: "emr-7.3.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar args: ["/opt/flink/scripts/hudi-write.sql"] parallelism: 1 upgradeMode: stateless podTemplate: spec: initContainers: - name: flink-sql-script-download args: - s3 - cp - s3://<s3_location>/hudi-write.sql - /flink-scripts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-scripts name: flink-scripts - name: flink-sql-runner-download args: - s3 - cp - s3://<s3_location>/flink-sql-runner.jar - /flink-artifacts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-artifacts name: flink-artifact containers: - name: flink-main-container volumeMounts: - mountPath: /opt/flink/scripts name: flink-scripts - mountPath: /opt/flink/usrlib name: flink-artifact volumes: - emptyDir: {} name: flink-scripts - emptyDir: {} name: flink-artifact
  7. Envíe un trabajo de Flink Hudi al operador de Flink Kubernetes.

    kubectl apply -f hudi-write.yaml