Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Gunakan cluster Iceberg dengan Flink
Dimulai dengan Amazon EMR versi 6.9.0, Anda dapat menggunakan Iceberg dengan cluster Flink tanpa langkah-langkah penyiapan yang diperlukan saat menggunakan Integrasi Iceberg Flink open source.
Membuat cluster Iceberg
Anda dapat membuat cluster dengan Iceberg diinstal menggunakan AWS Management Console, AWS CLI, atau Amazon EMR API. Dalam tutorial ini, Anda menggunakan AWS CLI untuk bekerja dengan Iceberg di cluster EMR Amazon. Untuk menggunakan konsol untuk membuat cluster dengan Iceberg diinstal, ikuti langkah-langkah dalam Membangun danau data Apache Iceberg menggunakan Amazon Athena, Amazon EMR,
Untuk menggunakan Iceberg di Amazon EMR dengan AWS CLI, pertama buat cluster dengan langkah-langkah berikut. Untuk informasi tentang menentukan klasifikasi Gunung Es menggunakan AWS CLI, lihat atau. Menyediakan konfigurasi menggunakan AWS CLI saat Anda membuat klaster Sediakan konfigurasi menggunakan Java SDK ketika Anda membuat sebuah klaster Buat file yang disebut configurations.json
dengan konten berikut:
[{
"Classification":"iceberg-defaults",
"Properties":{"iceberg.enabled":"true"}
}]
Selanjutnya, buat cluster dengan konfigurasi berikut, ganti contoh jalur bucket Amazon S3 dan ID subnet dengan nilai Anda sendiri:
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Flink \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_flink_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
Atau, Anda dapat membuat cluster Amazon EMR 6.9.0 dengan aplikasi Flink di dalamnya dan menggunakan file /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
sebagai dependensi JAR dalam pekerjaan Flink.
Menggunakan Flink SQL Client
Skrip SQL Client terletak di bawah/usr/lib/flink/bin
. Anda dapat menjalankan skrip dengan perintah berikut:
flink-yarn-session -d # starting the Flink YARN Session in detached mode
./sql-client.sh
Ini meluncurkan Flink SQL Shell.
Contoh Flink
Buat tabel Iceberg
Flink SQL
CREATE CATALOG glue_catalog WITH (
'type'='iceberg',
'warehouse'='<WAREHOUSE>',
'catalog-type'='glue'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS <DB>;
USE <DB>;
CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
Tabel API
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
String warehouse = "<WAREHOUSE>";
String db = "<DB>";
tEnv.executeSql(
"CREATE CATALOG glue_catalog WITH (\n"
+ " 'type'='iceberg',\n"
+ " 'warehouse'='"
+ warehouse
+ "',\n"
+ " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n"
+ " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n"
+ " );");
tEnv.executeSql("USE CATALOG glue_catalog;");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";");
tEnv.executeSql("USE " + db + ";");
tEnv.executeSql(
"CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Menulis ke meja Iceberg
Flink SQL
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
Tabel API
tEnv.executeSql(
"INSERT INTO `glue_catalog`.`"
+ db
+ "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
Datastream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
GenericRowData rowData1 = new GenericRowData(2);
rowData1.setField(0, 1L);
rowData1.setField(1, StringData.fromString("a"));
DataStream<RowData> input = env.fromElements(rowData1);
Map<String, String> props = new HashMap<();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStreamSink<Void> dataStreamSink =
FlinkSink.forRowData(input).tableLoader(tableLoader).append();
env.execute("Datastream Write");
Baca dari tabel Gunung Es
Flink SQL
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
Tabel API
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
Datastream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
Map<String, String> props = new HashMap<>();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStream<RowData> batch =
FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
batch.print().name("print-sink");
Menggunakan katalog Hive
Pastikan dependensi Flink dan Hive diselesaikan seperti yang dijelaskan dalam. Konfigurasikan Flink dengan Hive Metastore dan Glue Catalog
Menjalankan Job Flink
Salah satu cara untuk mengirimkan pekerjaan ke Flink adalah dengan menggunakan sesi Flink YARN per pekerjaan. Ini dapat diluncurkan dengan perintah berikut:
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
Pertimbangan untuk menggunakan Iceberg dengan Flink
-
Saat menggunakan AWS Glue sebagai katalog untuk Iceberg, pastikan database tempat Anda membuat tabel ada di Glue AWS . Jika Anda menggunakan layanan seperti AWS Lake Formation dan Anda tidak dapat memuat katalog, pastikan Anda memiliki akses yang tepat ke layanan untuk menjalankan perintah.
Integrasi Iceberg Glue tidak berfungsi dengan katalog Redshift Managed Storage.