Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Gunakan cluster Delta Lake dengan Flink
Dengan Amazon EMR rilis 6.11 dan yang lebih tinggi, Anda dapat menggunakan Delta Lake dengan cluster Flink Anda. Contoh berikut menggunakan AWS CLI untuk bekerja dengan Delta Lake di cluster Amazon EMR Flink.
catatan
Amazon EMR mendukung Flink DataStream API saat Anda menggunakan Delta Lake dengan cluster Flink.
Buat cluster Delta Lake
-
Buat file,
delta_configurations.json
, dengan konten berikut:[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
-
Buat cluster dengan konfigurasi berikut. Ganti
example Amazon S3 bucket path
dansubnet ID
dengan milik Anda sendiri.aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
Inisialisasi sesi benang Flink
Untuk menginisialisasi sesi benang Flink, jalankan perintah berikut:
flink-yarn-session -d
Bangun pekerjaan Flink dengan Delta Lake
Contoh berikut menunjukkan cara menggunakan sbt atau Maven untuk membangun pekerjaan Flink Anda dengan Delta Lake.
sbt
libraryDependencies ++= Seq(
"io.delta" %% "delta-flink" % deltaConnectorsVersion
% "provided",
"io.delta" %% "delta-standalone" % deltaConnectorsVersion
% "provided",
"org.apache.flink" %% "flink-clients" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-parquet" % flinkVersion
% "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion
% "provided",
"org.apache.flink" % "flink-table-common" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-table-runtime" % flinkVersion
% "provided")
Menulis ke tabel Delta dengan Flink Datastream API
Gunakan contoh berikut untuk membuat DeltaSink untuk menulis ke tabel dengan deltaTablePath:
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
Configuration configuration = new Configuration();
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new org.apache.flink.core.fs.Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Baca dari tabel Delta dengan Flink Datastream API
Gunakan contoh berikut untuk membuat dibatasi DeltaSource untuk membaca dari tabel dengan deltaTablePath:
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
Configuration configuration = new Configuration();
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new org.apache.flink.core.fs.Path(deltaTablePath
),
configuration)
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Pembuatan wastafel dengan dukungan multi-cluster untuk Delta Lake mandiri
Gunakan contoh berikut untuk membuat tabel DeltaSink to write to dengan dukungan deltaTablePath
dan multi cluster
public DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath) {
Configuration configuration = new Configuration();
configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log
");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1
");
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Jalankan pekerjaan Flink
Gunakan perintah berikut untuk menjalankan pekerjaan Anda:
flink run FlinkJob.jar