本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 Delta Lake 集群与 Flink 结合使用
从 Amazon EMR 6.11 版本开始,您可以将 Delta Lake 与您的 Flink 集群结合使用。以下示例使用在 Amazon EMR Flink 集群上使用 Delta Lake。 AWS CLI
注意
当你将 Delta Lake 与 Flink 集群配合使用时,亚马逊 EMR 支持 Flink DataStream API。
创建 Delta Lake 集群
-
创建文件
delta_configurations.json
并输入以下内容:[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
-
使用以下配置创建集群。在该 URL 中,将
example Amazon S3 bucket path
和subnet ID
替换为您自己的值。aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
初始化 Flink yarn 会话
要初始化 Flink yarn 会话,请运行以下命令:
flink-yarn-session -d
使用 Delta Lake 创建 Flink 作业
以下示例展示如何使用 sbt 或 Maven 在 Delta Lake 中构建 Flink 作业。
sbt
libraryDependencies ++= Seq(
"io.delta" %% "delta-flink" % deltaConnectorsVersion
% "provided",
"io.delta" %% "delta-standalone" % deltaConnectorsVersion
% "provided",
"org.apache.flink" %% "flink-clients" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-parquet" % flinkVersion
% "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion
% "provided",
"org.apache.flink" % "flink-table-common" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-table-runtime" % flinkVersion
% "provided")
通过 Flink Datastream API 写入 Delta 表
使用以下示例创建要写入 DeltaSink 到带有 a 的表中 deltaTablePath:
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
Configuration configuration = new Configuration();
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new org.apache.flink.core.fs.Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
通过 Flink Datastream API 从 Delta 表中读取
使用以下示例创建要从表中读 DeltaSource 取的有界值,其中带有 deltaTablePath:
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
Configuration configuration = new Configuration();
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new org.apache.flink.core.fs.Path(deltaTablePath
),
configuration)
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
使用对 Delta Lake 独立版的多集群支持创建接收器
使用以下示例创建 DeltaSink 支持deltaTablePath
和多集群
public DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath) {
Configuration configuration = new Configuration();
configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log
");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1
");
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
运行 Flink 作业
使用下列命令以运行您的作业:
flink run FlinkJob.jar