本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將 Delta Lake 叢集與 Flink 搭配使用
使用 Amazon EMR 6.11 版及更高版本,您可以將 Delta Lake 與 Flink 叢集搭配使用。下列範例使用 AWS CLI 在 Amazon EMR Flink 叢集上使用 Delta Lake。
注意
當您將 Delta Lake 與 Flink 叢集搭配使用時,Amazon EMR 支援 Flink DataStream API。
建立 Delta Lake 叢集
-
使用下列內容建立檔案
delta_configurations.json
:[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
-
使用下列組態建立叢集。將
example Amazon S3 bucket path
和subnet ID
取代為您自己的值。aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
初始化 Flink yarn 工作階段
若要初始化 Flink yarn 工作階段,請執行下列命令:
flink-yarn-session -d
使用 Delta Lake 建置 Flink 作業
下列範例示範如何使用 sbt 或 Maven 透過 Delta Lake 建置 Flink 作業。
sbt
libraryDependencies ++= Seq(
"io.delta" %% "delta-flink" % deltaConnectorsVersion
% "provided",
"io.delta" %% "delta-standalone" % deltaConnectorsVersion
% "provided",
"org.apache.flink" %% "flink-clients" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-parquet" % flinkVersion
% "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion
% "provided",
"org.apache.flink" % "flink-table-common" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-table-runtime" % flinkVersion
% "provided")
使用 Flink Datastream API 寫入至 Delta 資料表
使用下列範例建立 DeltaSink 以寫入至具有 deltaTablePath:
的資料表
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
Configuration configuration = new Configuration();
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new org.apache.flink.core.fs.Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
使用 Flink Datastream API 從 Delta 資料表中讀取
使用下列範例建立一個限制的 DeltaSource 以從具有 deltaTablePath:
的資料表中讀取
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
Configuration configuration = new Configuration();
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new org.apache.flink.core.fs.Path(deltaTablePath
),
configuration)
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
為 Delta Lake 獨立版建立具有多叢集支援的接收器
使用下列範例建立 DeltaSink 以寫入至具有 deltaTablePath
和多叢集支援
public DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath) {
Configuration configuration = new Configuration();
configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log
");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1
");
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
執行 Flink 作業
使用下列命令來執行您的作業:
flink run FlinkJob.jar