Pilih preferensi cookie Anda

Kami menggunakan cookie penting serta alat serupa yang diperlukan untuk menyediakan situs dan layanan. Kami menggunakan cookie performa untuk mengumpulkan statistik anonim sehingga kami dapat memahami cara pelanggan menggunakan situs dan melakukan perbaikan. Cookie penting tidak dapat dinonaktifkan, tetapi Anda dapat mengklik “Kustom” atau “Tolak” untuk menolak cookie performa.

Jika Anda setuju, AWS dan pihak ketiga yang disetujui juga akan menggunakan cookie untuk menyediakan fitur situs yang berguna, mengingat preferensi Anda, dan menampilkan konten yang relevan, termasuk iklan yang relevan. Untuk menerima atau menolak semua cookie yang tidak penting, klik “Terima” atau “Tolak”. Untuk membuat pilihan yang lebih detail, klik “Kustomisasi”.

Gunakan cluster Iceberg dengan Flink - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Gunakan cluster Iceberg dengan Flink

Dimulai dengan Amazon EMR versi 6.9.0, Anda dapat menggunakan Iceberg dengan cluster Flink tanpa langkah-langkah penyiapan yang diperlukan saat menggunakan Integrasi Iceberg Flink open source.

Membuat cluster Iceberg

Anda dapat membuat cluster dengan Iceberg diinstal menggunakan AWS Management Console, AWS CLI, atau Amazon EMR API. Dalam tutorial ini, Anda menggunakan AWS CLI untuk bekerja dengan Iceberg di cluster EMR Amazon. Untuk menggunakan konsol untuk membuat cluster dengan Iceberg diinstal, ikuti langkah-langkah dalam Membangun danau data Apache Iceberg menggunakan Amazon Athena, Amazon EMR, dan Glue. AWS

Untuk menggunakan Iceberg di Amazon EMR dengan AWS CLI, pertama buat cluster dengan langkah-langkah berikut. Untuk informasi tentang menentukan klasifikasi Gunung Es menggunakan AWS CLI, lihat atau. Menyediakan konfigurasi menggunakan AWS CLI saat Anda membuat klaster Sediakan konfigurasi menggunakan Java SDK ketika Anda membuat sebuah klaster Buat file yang disebut configurations.json dengan konten berikut:

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

Selanjutnya, buat cluster dengan konfigurasi berikut, ganti contoh jalur bucket Amazon S3 dan ID subnet dengan nilai Anda sendiri:

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://amzn-s3-demo-bucket/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

Atau, Anda dapat membuat cluster Amazon EMR 6.9.0 dengan aplikasi Flink di dalamnya dan menggunakan file /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar sebagai dependensi JAR dalam pekerjaan Flink.

Skrip SQL Client terletak di bawah/usr/lib/flink/bin. Anda dapat menjalankan skrip dengan perintah berikut:

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

Ini meluncurkan Flink SQL Shell.

Buat tabel Iceberg

Flink SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-type'='glue' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

Tabel API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

Menulis ke meja Iceberg

Flink SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

Tabel API

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

Datastream API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

Baca dari tabel Gunung Es

Flink SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

Tabel API

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

Datastream API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

Menggunakan katalog Hive

Pastikan dependensi Flink dan Hive diselesaikan seperti yang dijelaskan dalam. Konfigurasikan Flink dengan Hive Metastore dan Glue Catalog

Salah satu cara untuk mengirimkan pekerjaan ke Flink adalah dengan menggunakan sesi Flink YARN per pekerjaan. Ini dapat diluncurkan dengan perintah berikut:

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
  • Saat menggunakan AWS Glue sebagai katalog untuk Iceberg, pastikan database tempat Anda membuat tabel ada di Glue AWS . Jika Anda menggunakan layanan seperti AWS Lake Formation dan Anda tidak dapat memuat katalog, pastikan Anda memiliki akses yang tepat ke layanan untuk menjalankan perintah.

  • Integrasi Iceberg Glue tidak berfungsi dengan katalog Redshift Managed Storage.

PrivasiSyarat situsPreferensi cookie
© 2025, Amazon Web Services, Inc. atau afiliasinya. Semua hak dilindungi undang-undang.