Seleccione sus preferencias de cookies

Usamos cookies esenciales y herramientas similares que son necesarias para proporcionar nuestro sitio y nuestros servicios. Usamos cookies de rendimiento para recopilar estadísticas anónimas para que podamos entender cómo los clientes usan nuestro sitio y hacer mejoras. Las cookies esenciales no se pueden desactivar, pero puede hacer clic en “Personalizar” o “Rechazar” para rechazar las cookies de rendimiento.

Si está de acuerdo, AWS y los terceros aprobados también utilizarán cookies para proporcionar características útiles del sitio, recordar sus preferencias y mostrar contenido relevante, incluida publicidad relevante. Para aceptar o rechazar todas las cookies no esenciales, haga clic en “Aceptar” o “Rechazar”. Para elegir opciones más detalladas, haga clic en “Personalizar”.

Uso de un clúster de Delta Lake con Flink - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de un clúster de Delta Lake con Flink

Con la versión 6.11 y posteriores de Amazon EMR, puede usar Delta Lake con su clúster de Flink. En los siguientes ejemplos, se utiliza AWS CLI para trabajar con Delta Lake en un clúster Flink de Amazon EMR.

nota

Amazon EMR admite la DataStream API de Flink cuando utiliza Delta Lake con un clúster de Flink.

Creación de un clúster de Delta Lake

  1. Cree un archivo, delta_configurations.json, con el siguiente contenido:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
  2. Cree un clúster con la siguiente configuración. Reemplace example Amazon S3 bucket path y subnet ID por sus valores.

    aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

Para iniciar una sesión de Flink Yarn, ejecute el siguiente comando:

flink-yarn-session -d

En los siguientes ejemplos, se muestra cómo usar sbt o Maven para crear su trabajo de Flink con Delta Lake.

sbt

sbt es una herramienta de compilación para Scala que puede usar con poca o ninguna configuración cuando tiene proyectos pequeños.

libraryDependencies ++= Seq( "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided", "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided", "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided", "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.apache.flink" % "flink-table-common" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
Maven

Maven es una herramienta de automatización de compilaciones de código abierto de Apache Software Foundation. Con Maven, puede crear, publicar e implementar un trabajo de Flink con Delta Lake en Amazon EMR.

<project> <properties> <scala.main.version>2.12</scala.main.version> <delta-connectors-version>0.6.0</delta-connectors-version> <flink-version>1.16.1</flink-version> <hadoop-version>3.1.0</hadoop-version> </properties> <dependencies> <dependency> <groupId>io.delta</groupId> <artifactId>delta-flink</artifactId> <version>$delta-connectors-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-standalone_$scala-main-version</artifactId> <version>$delta-connectors-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>$hadoop-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> </dependencies>

sbt es una herramienta de compilación para Scala que puede usar con poca o ninguna configuración cuando tiene proyectos pequeños.

libraryDependencies ++= Seq( "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided", "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided", "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided", "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.apache.flink" % "flink-table-common" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")

Utilice el siguiente ejemplo para crear un objeto DeltaSink para escribir en la tabla con un deltaTablePath:

public static DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath, RowType rowType) { Configuration configuration = new Configuration(); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new org.apache.flink.core.fs.Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }

Utilice el siguiente ejemplo para crear un acotado DeltaSource que se lea en la tabla con un deltaTablePath:

public static DataStream<RowData> createBoundedDeltaSourceAllColumns( StreamExecutionEnvironment env, String deltaTablePath) { Configuration configuration = new Configuration(); DeltaSource<RowData> deltaSource = DeltaSource .forBoundedRowData( new org.apache.flink.core.fs.Path(deltaTablePath), configuration) .build(); return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); }

Creación de receptores con soporte para múltiples clústeres para la versión independiente de Delta Lake

Utilice el siguiente ejemplo para crear una tabla en la que DeltaSink escribir deltaTablePath y que sea compatible con varios clústeres:

public DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath) { Configuration configuration = new Configuration(); configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1"); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }

Use el comando siguiente para ejecutar el trabajo:

flink run FlinkJob.jar
PrivacidadTérminos del sitioPreferencias de cookies
© 2025, Amazon Web Services, Inc o sus afiliados. Todos los derechos reservados.