Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Uso de un clúster de Delta Lake con Flink
Con la versión 6.11 y posteriores de Amazon EMR, puede usar Delta Lake con su clúster de Flink. En los siguientes ejemplos, se utiliza AWS CLI para trabajar con Delta Lake en un clúster Flink de Amazon EMR.
nota
Amazon EMR admite la DataStream API de Flink cuando utiliza Delta Lake con un clúster de Flink.
Creación de un clúster de Delta Lake
-
Cree un archivo,
delta_configurations.json
, con el siguiente contenido:[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
-
Cree un clúster con la siguiente configuración. Reemplace
example Amazon S3 bucket path
ysubnet ID
por sus valores.aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
Inicialización de una sesión de yarn de Flink
Para iniciar una sesión de Flink Yarn, ejecute el siguiente comando:
flink-yarn-session -d
Creación de un trabajo de Flink con Delta Lake
En los siguientes ejemplos, se muestra cómo usar sbt o Maven para crear su trabajo de Flink con Delta Lake.
sbt
libraryDependencies ++= Seq(
"io.delta" %% "delta-flink" % deltaConnectorsVersion
% "provided",
"io.delta" %% "delta-standalone" % deltaConnectorsVersion
% "provided",
"org.apache.flink" %% "flink-clients" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-parquet" % flinkVersion
% "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion
% "provided",
"org.apache.flink" % "flink-table-common" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-table-runtime" % flinkVersion
% "provided")
Escritura en una tabla Delta con la API DataStream de Flink
Utilice el siguiente ejemplo para crear un objeto DeltaSink para escribir en la tabla con un deltaTablePath:
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
Configuration configuration = new Configuration();
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new org.apache.flink.core.fs.Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Lectura de una tabla Delta con la API Flink Datastream
Utilice el siguiente ejemplo para crear un acotado DeltaSource que se lea en la tabla con un deltaTablePath:
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
Configuration configuration = new Configuration();
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new org.apache.flink.core.fs.Path(deltaTablePath
),
configuration)
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Creación de receptores con soporte para múltiples clústeres para la versión independiente de Delta Lake
Utilice el siguiente ejemplo para crear una tabla en la que DeltaSink escribir deltaTablePath
y que sea compatible con varios clústeres
public DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath) {
Configuration configuration = new Configuration();
configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log
");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1
");
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Ejecución del trabajo de Flink
Use el comando siguiente para ejecutar el trabajo:
flink run FlinkJob.jar