翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Flink での Iceberg クラスターの使用
Amazon EMR バージョン 6.9.0 以降では、オープンソースの Iceberg Flink 統合を使用する際に必要なセットアップ手順なしに Flink クラスターで Iceberg を使用できます。
Iceberg クラスターの作成
Iceberg がインストールされたクラスターは、 AWS Management Console、 AWS CLI、または Amazon EMR API を使用して作成できます。このチュートリアルでは、 AWS CLI を使用して Amazon EMR クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue
で Amazon EMR で Iceberg を使用するには AWS CLI、まず次のステップでクラスターを作成します。を使用して Iceberg 分類を指定する方法については AWS CLI、クラスターの作成 AWS CLI 時に を使用して設定を指定する「」または「」を参照してくださいクラスター作成時に Java SDK を使用して設定を指定する。configurations.json
というファイルを次の内容で作成します。
[{
"Classification":"iceberg-defaults",
"Properties":{"iceberg.enabled":"true"}
}]
次に、以下の設定でクラスターを作成し、この例の Amazon S3 バケットパスとサブネット ID を独自の値に置き換えます。
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Flink \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_flink_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
または、Flink アプリケーションを含む Amazon EMR 6.9.0 クラスターを作成し、/usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
ファイルを Flink ジョブの JAR 依存関係として使用することもできます。
Flink SQL クライアントの使用
SQL クライアントスクリプトは /usr/lib/flink/bin
にあります。次のコマンドを使用してスクリプトを実行します。
flink-yarn-session -d # starting the Flink YARN Session in detached mode
./sql-client.sh
これにより Flink SQL シェルが起動します。
Flink の例
Iceberg テーブルの作成
Flink SQL
CREATE CATALOG glue_catalog WITH (
'type'='iceberg',
'warehouse'='<WAREHOUSE>',
'catalog-type'='glue'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS <DB>;
USE <DB>;
CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
Table API
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
String warehouse = "<WAREHOUSE>";
String db = "<DB>";
tEnv.executeSql(
"CREATE CATALOG glue_catalog WITH (\n"
+ " 'type'='iceberg',\n"
+ " 'warehouse'='"
+ warehouse
+ "',\n"
+ " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n"
+ " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n"
+ " );");
tEnv.executeSql("USE CATALOG glue_catalog;");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";");
tEnv.executeSql("USE " + db + ";");
tEnv.executeSql(
"CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Iceberg テーブルへの書き込み
Flink SQL
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
Table API
tEnv.executeSql(
"INSERT INTO `glue_catalog`.`"
+ db
+ "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
Datastream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
GenericRowData rowData1 = new GenericRowData(2);
rowData1.setField(0, 1L);
rowData1.setField(1, StringData.fromString("a"));
DataStream<RowData> input = env.fromElements(rowData1);
Map<String, String> props = new HashMap<();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStreamSink<Void> dataStreamSink =
FlinkSink.forRowData(input).tableLoader(tableLoader).append();
env.execute("Datastream Write");
Iceberg テーブルからの読み込み
Flink SQL
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
Table API
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
Datastream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
Map<String, String> props = new HashMap<>();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStream<RowData> batch =
FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
batch.print().name("print-sink");
Hive カタログの使用
Flink と Hive の依存関係が「Hive Metastore と Glue Catalog を使用して Flink を設定する」の説明に従って解決されていることを確認します。
Flink ジョブの実行
Flink にジョブを送信する方法の 1 つは、ジョブ単位の Flink YARN セッションを使用することです。これを実行するには、次のコマンドを使用します。
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
Flink で Iceberg を使用するための考慮事項
-
Iceberg のカタログとして AWS Glue を使用する場合は、テーブルを作成するデータベースが Glue AWS に存在することを確認してください。などのサービスを使用して AWS Lake Formation いて、カタログをロードできない場合は、コマンドを実行するためのサービスへの適切なアクセス権があることを確認してください。
Iceberg Glue の統合は、Redshift マネージドストレージカタログでは機能しません。