Select your cookie preferences

We use essential cookies and similar tools that are necessary to provide our site and services. We use performance cookies to collect anonymous statistics, so we can understand how customers use our site and make improvements. Essential cookies cannot be deactivated, but you can choose “Customize” or “Decline” to decline performance cookies.

If you agree, AWS and approved third parties will also use cookies to provide useful site features, remember your preferences, and display relevant content, including relevant advertising. To accept or decline all non-essential cookies, choose “Accept” or “Decline.” To make more detailed choices, choose “Customize.”

Scala script example - streaming ETL

Focus mode
Scala script example - streaming ETL - AWS Glue

The following example script connects to Amazon Kinesis Data Streams, uses a schema from the Data Catalog to parse a data stream, joins the stream to a static dataset on Amazon S3, and outputs the joined results to Amazon S3 in parquet format.

// This script connects to an Amazon Kinesis stream, uses a schema from the data catalog to parse the stream, // joins the stream to a static dataset on Amazon S3, and outputs the joined results to Amazon S3 in parquet format. import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import java.util.Calendar import org.apache.spark.SparkContext import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.streaming.Trigger import scala.collection.JavaConverters._ object streamJoiner { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession import sparkSession.implicits._ // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val staticData = sparkSession.read // read() returns type DataFrameReader .format("csv") .option("header", "true") .load("s3://awsexamplebucket-streaming-demo2/inputs/productsStatic.csv") // load() returns a DataFrame val datasource0 = sparkSession.readStream // readstream() returns type DataStreamReader .format("kinesis") .option("streamName", "stream-join-demo") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("startingPosition", "TRIM_HORIZON") .load // load() returns a DataFrame val selectfields1 = datasource0.select(from_json($"data".cast("string"), glueContext.getCatalogSchemaAsSparkSchema("stream-demos", "stream-join-demo2")) as "data").select("data.*") val datasink2 = selectfields1.writeStream.foreachBatch { (dataFrame: Dataset[Row], batchId: Long) => { //foreachBatch() returns type DataStreamWriter val joined = dataFrame.join(staticData, "product_id") val year: Int = Calendar.getInstance().get(Calendar.YEAR) val month :Int = Calendar.getInstance().get(Calendar.MONTH) + 1 val day: Int = Calendar.getInstance().get(Calendar.DATE) val hour: Int = Calendar.getInstance().get(Calendar.HOUR_OF_DAY) if (dataFrame.count() > 0) { joined.write // joined.write returns type DataFrameWriter .mode(SaveMode.Append) .format("parquet") .option("quote", " ") .save("s3://awsexamplebucket-streaming-demo2/output/" + "/year=" + "%04d".format(year) + "/month=" + "%02d".format(month) + "/day=" + "%02d".format(day) + "/hour=" + "%02d".format(hour) + "/") } } } // end foreachBatch() .trigger(Trigger.ProcessingTime("100 seconds")) .option("checkpointLocation", "s3://awsexamplebucket-streaming-demo2/checkpoint/") .start().awaitTermination() // start() returns type StreamingQuery Job.commit() } }
PrivacySite termsCookie preferences
© 2025, Amazon Web Services, Inc. or its affiliates. All rights reserved.