Apache Flink에서 Apache Hudi 사용 - Amazon EMR

Apache Flink에서 Apache Hudi 사용

Apache Hudi는 삽입, 업데이트, 업서트 및 삭제와 같은 레코드 수준 작업을 통해 데이터 관리 및 데이터 파이프라인 개발을 간소화하는 데 사용할 수 있는 오픈 소스 데이터 관리 프레임워크입니다. Amazon S3에서 효율적인 데이터 관리와 결합하여 Hudi는 데이터를 실시간으로 수집 및 업데이트할 수 있도록 합니다. Hudi는 데이터세트에서 실행하는 모든 작업의 메타데이터를 유지 관리하므로 모든 작업은 원자적이고 일관되게 유지됩니다.

Apache Hudi는 Amazon EMR 릴리스 7.2.0 이상의 Apache Flink와 함께 Amazon EMR on EKS에서 사용할 수 있습니다. Apache Hudi 작업을 시작하고 제출하는 방법을 알아보려면 다음 단계를 참조하세요.

Apache Hudi 작업을 제출하는 방법을 알아보려면 다음 단계를 참조하세요.

  1. default라는 AWS Glue 데이터베이스를 생성합니다.

    aws glue create-database --database-input "{\"Name\":\"default\"}"
  2. Flink Kubernetes Operator SQL Example에 따라 flink-sql-runner.jar 파일을 빌드합니다.

  3. 다음과 같은 Hudi SQL 스크립트를 생성합니다.

    CREATE CATALOG hudi_glue_catalog WITH ( 'type' = 'hudi', 'mode' = 'hms', 'table.external' = 'true', 'default-database' = 'default', 'hive.conf.dir' = '/glue/confs/hive/conf/', 'catalog.path' = 's3://<hudi-example-bucket>/FLINK_HUDI/warehouse/' ); USE CATALOG hudi_glue_catalog; CREATE DATABASE IF NOT EXISTS hudi_db; use hudi_db; CREATE TABLE IF NOT EXISTS hudi-flink-example-table( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'path' = 's3://<hudi-example-bucket>/hudi-flink-example-table', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'glue', 'hive_sync.table' = 'hudi-flink-example-table', 'hive_sync.db' = 'hudi_db', 'compaction.delta_commits' = '1', 'hive_sync.partition_fields' = 'partition', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'table.type' = 'COPY_ON_WRITE' ); EXECUTE STATEMENT SET BEGIN INSERT INTO hudi-flink-example-table VALUES ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); END;
  4. Hudi SQL 스크립트 및 flink-sql-runner.jar 파일을 S3 위치에 업로드합니다.

  5. FlinkDeployments YAML 파일에서 hudi.enabledtrue로 설정합니다.

    spec: flinkConfiguration: hudi.enabled: "true"
  6. YAML 파일을 생성하여 구성을 실행합니다. 이 예제 파일 이름은 hudi-write.yaml입니다.

    apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: hudi-write-example spec: flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" hudi.enabled: "true" executionRoleArn: "<JobExecutionRole>" emrReleaseLabel: "emr-7.3.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar args: ["/opt/flink/scripts/hudi-write.sql"] parallelism: 1 upgradeMode: stateless podTemplate: spec: initContainers: - name: flink-sql-script-download args: - s3 - cp - s3://<s3_location>/hudi-write.sql - /flink-scripts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-scripts name: flink-scripts - name: flink-sql-runner-download args: - s3 - cp - s3://<s3_location>/flink-sql-runner.jar - /flink-artifacts image: amazon/aws-cli:latest imagePullPolicy: Always resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /flink-artifacts name: flink-artifact containers: - name: flink-main-container volumeMounts: - mountPath: /opt/flink/scripts name: flink-scripts - mountPath: /opt/flink/usrlib name: flink-artifact volumes: - emptyDir: {} name: flink-scripts - emptyDir: {} name: flink-artifact
  7. Flink Kubernetes 연산자에 Flink Hudi 작업을 제출합니다.

    kubectl apply -f hudi-write.yaml