Sélectionner vos préférences de cookies

Nous utilisons des cookies essentiels et des outils similaires qui sont nécessaires au fonctionnement de notre site et à la fourniture de nos services. Nous utilisons des cookies de performance pour collecter des statistiques anonymes afin de comprendre comment les clients utilisent notre site et d’apporter des améliorations. Les cookies essentiels ne peuvent pas être désactivés, mais vous pouvez cliquer sur « Personnaliser » ou « Refuser » pour refuser les cookies de performance.

Si vous êtes d’accord, AWS et les tiers approuvés utiliseront également des cookies pour fournir des fonctionnalités utiles au site, mémoriser vos préférences et afficher du contenu pertinent, y compris des publicités pertinentes. Pour accepter ou refuser tous les cookies non essentiels, cliquez sur « Accepter » ou « Refuser ». Pour effectuer des choix plus détaillés, cliquez sur « Personnaliser ».

Utiliser un cluster Iceberg avec Flink - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utiliser un cluster Iceberg avec Flink

À partir de la version 6.9.0 d'Amazon EMR, vous pouvez utiliser Iceberg avec un cluster Flink sans avoir à effectuer les étapes de configuration requises lors de l'utilisation de l'intégration open source Iceberg Flink.

Création d'un cluster Iceberg

Vous pouvez créer un cluster avec Iceberg installé en utilisant la AWS Management Console, l' AWS CLI ou l'API Amazon EMR. Dans ce didacticiel, vous allez utiliser le AWS CLI pour travailler avec Iceberg sur un cluster Amazon EMR. Pour utiliser la console afin de créer un cluster avec Iceberg installé, suivez les étapes de la section Création d'un lac de données Apache Iceberg à l'aide d'Amazon Athena, d'Amazon EMR et d' AWS Glue.

Pour utiliser Iceberg sur Amazon EMR avec AWS CLI le, créez d'abord un cluster en suivant les étapes suivantes. Pour plus d'informations sur la spécification de la classification des icebergs à l'aide du AWS CLI, voir Fournissez une configuration à l'aide du AWS CLI lorsque vous créez un cluster ouFournir une configuration à l'aide du kit SDK Java lors de la création d'un cluster. Créez un fichier nommé configurations.json avec le contenu suivant :

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

Créez ensuite un cluster avec la configuration suivante, en remplaçant les exemples de chemin de compartiment et d'ID de sous-réseau Amazon S3 par vos propres valeurs :

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://amzn-s3-demo-bucket/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

Vous pouvez également créer un cluster Amazon EMR 6.9.0 contenant une application Flink et utiliser le fichier /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar comme dépendance JAR dans une tâche Flink.

Le script du client SQL se trouve sous /usr/lib/flink/bin. Vous pouvez exécuter le script à l'aide de la commande suivante :

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

Cela lance un Flink SQL Shell.

Création d'une table Iceberg

Flink SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-type'='glue' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

API de table

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

Écriture dans une table Iceberg

Flink SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

API de table

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

API Datastream

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

Lecture à partir d'une table Iceberg

Flink SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

API de table

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

API Datastream

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

Utilisation du catalogue Hive

Assurez-vous que les dépendances de Flink et Hive sont résolues comme décrit dans Configuration de Flink avec Hive Metastore et Glue Catalog.

Une façon de soumettre une tâche à Flink consiste à utiliser une session Flink YARN par tâche. Vous pouvez le lancer à l'aide de la commande suivante :

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
  • Lorsque vous utilisez AWS Glue comme catalogue pour Iceberg, assurez-vous que la base de données dans laquelle vous créez une table existe dans AWS Glue. Si vous utilisez des services tels que AWS Lake Formation et que vous ne parvenez pas à charger le catalogue, assurez-vous de disposer d'un accès approprié au service pour exécuter la commande.

  • L'intégration d'Iceberg Glue ne fonctionne pas avec le catalogue Redshift Managed Storage.

ConfidentialitéConditions d'utilisation du sitePréférences de cookies
© 2025, Amazon Web Services, Inc. ou ses affiliés. Tous droits réservés.