AWSUsare Glue con Flink - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

AWSUsare Glue con Flink

Amazon EMR su EKS con Apache Flink nelle versioni 6.15.0 e successive supporta l'utilizzo di AWS Glue Data Catalog come archivio di metadati per flussi di lavoro SQL in streaming e batch.

È innanzitutto necessario creare un database AWS Glue denominato default che funga da Flink SQL Catalog. Questo catalogo Flink memorizza metadati come database, tabelle, partizioni, viste, funzioni e altre informazioni necessarie per accedere ai dati in altri sistemi esterni.

aws glue create-database \ --database-input "{\"Name\":\"default\"}"

Per abilitare il supporto di AWS Glue, usa una FlinkDeployment specifica. Questa specifica di esempio utilizza uno script Python per emettere rapidamente alcune istruzioni SQL Flink per interagire con il AWS catalogo Glue.

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: flinkVersion: v1_17 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" aws.glue.enabled: "true" executionRoleArn: job-execution-role-arn; emrReleaseLabel: "emr-6.15.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: s3://<S3_bucket_with_your_script/pyflink-glue-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-glue-script.py"] parallelism: 1 upgradeMode: stateless

Di seguito è riportato un esempio di come potrebbe apparire PyFlink lo script.

import logging import sys from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment def glue_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql(""" CREATE CATALOG glue_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/glue/confs/hive/conf', 'hadoop-conf-dir' = '/glue/confs/hadoop/conf' ) """) t_env.execute_sql(""" USE CATALOG glue_catalog; """) t_env.execute_sql(""" DROP DATABASE IF EXISTS eks_flink_db CASCADE; """) t_env.execute_sql(""" CREATE DATABASE IF NOT EXISTS eks_flink_db WITH ('hive.database.location-uri'= 's3a://S3-bucket-to-store-metadata/flink/flink-glue-for-hive/warehouse/'); """) t_env.execute_sql(""" USE eks_flink_db; """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer RO first_name STRING, last_name STRING, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksdestglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer ROW first_name STRING, last_name STRING, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'filesystem', 'path' = 's3://S3-bucket-to-store-metadata/flink/flink-glue-for-hive/warehouse/eksdestglueorders', 'format' = 'json' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS print_table ( order_number BIGINT, price DECIMAL(32,2), buyer ROW first_name STRING, last_name STRING, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); """) t_env.execute_sql(""" EXECUTE STATEMENT SET BEGIN INSERT INTO eksdestglueorders SELECT * FROM eksglueorders LIMIT 10; INSERT INTO print_table SELECT * FROM eksdestglueorders; END; """) if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") glue_demo()