Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Usar un clúster de Iceberg con Flink
A partir de la versión 6.9.0 de Amazon EMR, puede utilizar Iceberg con un clúster de Flink sin los pasos de configuración necesarios al utilizar la integración de código abierto de Flink de Iceberg.
Creación de un clúster de Iceberg
Puede crear un clúster con Iceberg instalado mediante la AWS Management Console, la AWS CLI o la API de Amazon EMR. En este tutorial, utilizará el AWS CLI para trabajar con Iceberg en un clúster de Amazon EMR. Para usar la consola para crear un clúster con Iceberg instalado, siga los pasos que se indican en Crear un lago de datos de Apache Iceberg con Amazon Athena, Amazon EMR y AWS Glue
Para usar Iceberg en Amazon EMR con AWS CLI el, cree primero un clúster con los siguientes pasos. Para obtener información sobre cómo especificar la clasificación de Iceberg mediante la AWS CLI, consulte o. Proporcione una configuración mediante la opción AWS CLI al crear un clúster Proporcione una configuración mediante el SDK de Java al crear un clúster Cree un archivo denominado configurations.json
con el siguiente contenido:
[{
"Classification":"iceberg-defaults",
"Properties":{"iceberg.enabled":"true"}
}]
A continuación, cree un clúster con la siguiente configuración y sustituya la ruta del bucket y el ID de subred de Amazon S3 de ejemplo por sus propios valores:
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Flink \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_flink_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
Como alternativa, puede crear un clúster de Amazon EMR 6.9.0 con una aplicación de Flink y utilizar el archivo /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
como una dependencia JAR en un trabajo de Flink.
Uso del cliente SQL de Flink
El script del cliente SQL se encuentra en /usr/lib/flink/bin
. Puede ejecutar el script con el siguiente comando:
flink-yarn-session -d # starting the Flink YARN Session in detached mode
./sql-client.sh
Esto lanza un intérprete de comandos SQL de Flink.
Ejemplos de Flink
Crear una tabla de Iceberg
SQL de Flink
CREATE CATALOG glue_catalog WITH (
'type'='iceberg',
'warehouse'='<WAREHOUSE>',
'catalog-type'='glue'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS <DB>;
USE <DB>;
CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
API de tabla
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
String warehouse = "<WAREHOUSE>";
String db = "<DB>";
tEnv.executeSql(
"CREATE CATALOG glue_catalog WITH (\n"
+ " 'type'='iceberg',\n"
+ " 'warehouse'='"
+ warehouse
+ "',\n"
+ " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n"
+ " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n"
+ " );");
tEnv.executeSql("USE CATALOG glue_catalog;");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";");
tEnv.executeSql("USE " + db + ";");
tEnv.executeSql(
"CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Escribir en una tabla de Iceberg
SQL de Flink
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
API de tabla
tEnv.executeSql(
"INSERT INTO `glue_catalog`.`"
+ db
+ "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
API de Datastream
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
GenericRowData rowData1 = new GenericRowData(2);
rowData1.setField(0, 1L);
rowData1.setField(1, StringData.fromString("a"));
DataStream<RowData> input = env.fromElements(rowData1);
Map<String, String> props = new HashMap<();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStreamSink<Void> dataStreamSink =
FlinkSink.forRowData(input).tableLoader(tableLoader).append();
env.execute("Datastream Write");
Leer desde una tabla de Iceberg
SQL de Flink
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
API de tabla
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
API de Datastream
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
Map<String, String> props = new HashMap<>();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStream<RowData> batch =
FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
batch.print().name("print-sink");
Uso del catálogo de Hive
Asegúrese de que las dependencias entre Flink y Hive se resuelvan como se describe en Configuración de Flink con el metaalmacén de Hive y el Catálogo de Glue.
Ejecución de un trabajo de Flink
Una forma de enviar un trabajo a Flink es utilizar una sesión de Flink YARN por trabajo. Puede iniciarlo con el siguiente comando:
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
Consideraciones para el uso de Iceberg con Flink
-
Cuando utilice AWS Glue como catálogo de Iceberg, asegúrese de que la base de datos en la que va a crear la tabla esté en AWS Glue. Si utilizas servicios como estos AWS Lake Formation y no puedes cargar el catálogo, asegúrate de tener el acceso adecuado al servicio para ejecutar el comando.
La integración de Iceberg Glue no funciona con el catálogo de almacenamiento gestionado de Redshift.