Flink에서 AWS Glue 사용
Apache Flink 릴리스 6.15.0 이상에서 Amazon EMR on EKS는 AWS Glue Data Catalog를 스트리밍 및 배치 SQL 워크플로를 위한 메타데이터 저장소로 사용하는 것을 지원합니다.
먼저 Flink SQL 카탈로그 역할을 default
라고 하는 AWS Glue 데이터베이스를 생성해야 합니다. 이 Flink 카탈로그는 데이터베이스, 테이블, 파티션, 보기, 함수 및 기타 외부 시스템의 데이터에 액세스하는 데 필요한 기타 정보와 같은 메타데이터를 저장합니다.
aws glue create-database \ --database-input "{\"Name\":\"default\"}"
AWS Glue 지원을 활성화하려면 FlinkDeployment
사양을 사용합니다. 이 예제 사양은 Python 스크립트를 사용하여 AWS Glue 카탈로그와 상호 작용하기 위해 일부 Flink SQL 문을 빠르게 실행합니다.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: flinkVersion: v1_17 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" aws.glue.enabled: "true" executionRoleArn:
job-execution-role-arn
; emrReleaseLabel: "emr-6.15.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: s3://<S3_bucket_with_your_script
/pyflink-glue-script.py
entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-glue-script.py
"] parallelism: 1 upgradeMode: stateless
다음은 PyFlink 스크립트에 대한 예제입니다.
import logging import sys from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment def glue_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql(""" CREATE CATALOG glue_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/glue/confs/hive/conf', 'hadoop-conf-dir' = '/glue/confs/hadoop/conf' ) """) t_env.execute_sql(""" USE CATALOG glue_catalog; """) t_env.execute_sql(""" DROP DATABASE IF EXISTS eks_flink_db CASCADE; """) t_env.execute_sql(""" CREATE DATABASE IF NOT EXISTS eks_flink_db WITH ('hive.database.location-uri'= 's3a://
S3-bucket-to-store-metadata
/flink/flink-glue-for-hive/warehouse/'); """) t_env.execute_sql(""" USE eks_flink_db; """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer ROfirst_name STRING, last_name STRING
, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksdestglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer ROWfirst_name STRING, last_name STRING
, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'filesystem', 'path' = 's3://S3-bucket-to-store-metadata
/flink/flink-glue-for-hive/warehouse/eksdestglueorders', 'format' = 'json' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS print_table ( order_number BIGINT, price DECIMAL(32,2), buyer ROWfirst_name STRING, last_name STRING
, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); """) t_env.execute_sql(""" EXECUTE STATEMENT SET BEGIN INSERT INTO eksdestglueorders SELECT * FROM eksglueorders LIMIT 10; INSERT INTO print_table SELECT * FROM eksdestglueorders; END; """) if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") glue_demo()