Seleccione sus preferencias de cookies

Usamos cookies esenciales y herramientas similares que son necesarias para proporcionar nuestro sitio y nuestros servicios. Usamos cookies de rendimiento para recopilar estadísticas anónimas para que podamos entender cómo los clientes usan nuestro sitio y hacer mejoras. Las cookies esenciales no se pueden desactivar, pero puede hacer clic en “Personalizar” o “Rechazar” para rechazar las cookies de rendimiento.

Si está de acuerdo, AWS y los terceros aprobados también utilizarán cookies para proporcionar características útiles del sitio, recordar sus preferencias y mostrar contenido relevante, incluida publicidad relevante. Para aceptar o rechazar todas las cookies no esenciales, haga clic en “Aceptar” o “Rechazar”. Para elegir opciones más detalladas, haga clic en “Personalizar”.

Ejemplo de script de Scala - ETL de streaming

Modo de enfoque
Ejemplo de script de Scala - ETL de streaming - AWS Glue

El siguiente script de ejemplo se conecta a Amazon Kinesis Data Streams, utiliza un esquema del Data Catalog para analizar un flujo de datos, une el flujo a un conjunto de datos estático en Amazon S3 y envía los resultados combinados a Amazon S3 en formato parquet.

// This script connects to an Amazon Kinesis stream, uses a schema from the data catalog to parse the stream, // joins the stream to a static dataset on Amazon S3, and outputs the joined results to Amazon S3 in parquet format. import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import java.util.Calendar import org.apache.spark.SparkContext import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.streaming.Trigger import scala.collection.JavaConverters._ object streamJoiner { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession import sparkSession.implicits._ // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val staticData = sparkSession.read // read() returns type DataFrameReader .format("csv") .option("header", "true") .load("s3://awsexamplebucket-streaming-demo2/inputs/productsStatic.csv") // load() returns a DataFrame val datasource0 = sparkSession.readStream // readstream() returns type DataStreamReader .format("kinesis") .option("streamName", "stream-join-demo") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("startingPosition", "TRIM_HORIZON") .load // load() returns a DataFrame val selectfields1 = datasource0.select(from_json($"data".cast("string"), glueContext.getCatalogSchemaAsSparkSchema("stream-demos", "stream-join-demo2")) as "data").select("data.*") val datasink2 = selectfields1.writeStream.foreachBatch { (dataFrame: Dataset[Row], batchId: Long) => { //foreachBatch() returns type DataStreamWriter val joined = dataFrame.join(staticData, "product_id") val year: Int = Calendar.getInstance().get(Calendar.YEAR) val month :Int = Calendar.getInstance().get(Calendar.MONTH) + 1 val day: Int = Calendar.getInstance().get(Calendar.DATE) val hour: Int = Calendar.getInstance().get(Calendar.HOUR_OF_DAY) if (dataFrame.count() > 0) { joined.write // joined.write returns type DataFrameWriter .mode(SaveMode.Append) .format("parquet") .option("quote", " ") .save("s3://awsexamplebucket-streaming-demo2/output/" + "/year=" + "%04d".format(year) + "/month=" + "%02d".format(month) + "/day=" + "%02d".format(day) + "/hour=" + "%02d".format(hour) + "/") } } } // end foreachBatch() .trigger(Trigger.ProcessingTime("100 seconds")) .option("checkpointLocation", "s3://awsexamplebucket-streaming-demo2/checkpoint/") .start().awaitTermination() // start() returns type StreamingQuery Job.commit() } }
PrivacidadTérminos del sitioPreferencias de cookies
© 2025, Amazon Web Services, Inc o sus afiliados. Todos los derechos reservados.