本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
搭配 Flink 使用 AWS Glue
Amazon EMR on EKS 搭配 Apache Flink 6.15.0 版和更新版本支援使用 AWS Glue Data Catalog 做為串流和批次 SQL 工作流程的中繼資料存放區。
您必須先建立名為 AWS default
的 Glue 資料庫,做為 Flink SQL Catalog。此 Flink Catalog 會存放中繼資料,例如資料庫、資料表、分割區、檢視、函數,以及存取其他外部系統中資料所需的其他資訊。
aws glue create-database \ --database-input "{\"Name\":\"default\"}"
若要啟用 AWS Glue 支援,請使用FlinkDeployment
規格。此範例規格使用 Python 指令碼來快速發行一些 Flink SQL 陳述式,以與 Glue AWS 目錄互動。
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: flinkVersion: v1_17 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" aws.glue.enabled: "true" executionRoleArn:
job-execution-role-arn
; emrReleaseLabel: "emr-6.15.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: s3://<S3_bucket_with_your_script
/pyflink-glue-script.py
entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-glue-script.py
"] parallelism: 1 upgradeMode: stateless
以下是 PyFlink 指令碼可能看起來的範例。
import logging import sys from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment def glue_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql(""" CREATE CATALOG glue_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/glue/confs/hive/conf', 'hadoop-conf-dir' = '/glue/confs/hadoop/conf' ) """) t_env.execute_sql(""" USE CATALOG glue_catalog; """) t_env.execute_sql(""" DROP DATABASE IF EXISTS eks_flink_db CASCADE; """) t_env.execute_sql(""" CREATE DATABASE IF NOT EXISTS eks_flink_db WITH ('hive.database.location-uri'= 's3a://
S3-bucket-to-store-metadata
/flink/flink-glue-for-hive/warehouse/'); """) t_env.execute_sql(""" USE eks_flink_db; """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer ROfirst_name STRING, last_name STRING
, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksdestglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer ROWfirst_name STRING, last_name STRING
, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'filesystem', 'path' = 's3://S3-bucket-to-store-metadata
/flink/flink-glue-for-hive/warehouse/eksdestglueorders', 'format' = 'json' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS print_table ( order_number BIGINT, price DECIMAL(32,2), buyer ROWfirst_name STRING, last_name STRING
, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); """) t_env.execute_sql(""" EXECUTE STATEMENT SET BEGIN INSERT INTO eksdestglueorders SELECT * FROM eksglueorders LIMIT 10; INSERT INTO print_table SELECT * FROM eksdestglueorders; END; """) if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") glue_demo()