Connecting to DynamoDB with
Amazon EMR Serverless
In this tutorial, you upload a subset of data from the United States Board on
Geographic Names
Step 1: Upload data to an Amazon S3
bucket
To create an Amazon S3 bucket, follow the instructions in Creating a bucket in
the Amazon Simple Storage Service Console User Guide. Replace references to
with the name of
your newly created bucket. Now your EMR Serverless application is ready to run
jobs.amzn-s3-demo-bucket
-
Download the sample data archive
features.zip
with the following command.wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
-
Extract the
features.txt
file from the archive and view the first the few lines in the file:unzip features.zip head features.txt
The result should look similar to the following.
1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98
The fields in each line here indicate a unique identifier, name, type of natural feature, state, latitude in degrees, longitude in degrees, and height in feet.
-
Upload your data to Amazon S3
aws s3 cp features.txt s3://
amzn-s3-demo-bucket
/features/
Step 2: Create a Hive
table
Use Apache Spark or Hive to create a new Hive table that contains the uploaded data in Amazon S3.
To create a Hive table with Spark, run the following command.
import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE
hive_features
\ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket
/features';")
You now have a populated Hive table with data from the
features.txt
file. To verify that your data is in the
table, run a Spark SQL query as shown in the following example.
sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM
hive_features
GROUP BY state_alpha;")
Step 3: Copy data to DynamoDB
Use Spark or Hive to copy data to a new DynamoDB table.
To copy data from the Hive table that you created in the previous step
to DynamoDB, follow Steps 1-3 in Copy data to DynamoDB. This creates a new DynamoDB table called
Features
. You can then read data directly from the text
file and copy it to your DynamoDB table, as the following example
shows.
import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "
region
") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket
/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Step 4: Query data from DynamoDB
Use Spark or Hive to query your DynamoDB table.
To query data from the DynamoDB table that you created in the previous step, you can use either Spark SQL or the Spark MapReduce API.
Example – Query your DynamoDB table with Spark SQL
The following Spark SQL query returns a list of all the feature types in alphabetical order.
val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \
FROM ddb_features \
ORDER BY feature_class;")
The following Spark SQL query returns a list of all lakes that begin with the letter M.
val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \
FROM ddb_features \
WHERE feature_class = 'Lake' \
AND feature_name LIKE 'M%' \
ORDER BY feature_name;")
The following Spark SQL query returns a list of all states with at least three features that are higher than one mile.
val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \
FROM ddb_features \
WHERE elev_in_ft > 5280 \
GROUP by state_alpha, feature_class \
HAVING COUNT(*) >= 3 \
ORDER BY state_alpha, feature_class;")
Example – Query your DynamoDB table with the Spark MapReduce API
The following MapReduce query returns a list of all the feature types in alphabetical order.
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
.map(pair => (pair._1, pair._2.getItem))
.map(pair => pair._2.get("feature_class").getS)
.distinct()
.sortBy(value => value)
.toDF("feature_class")
The following MapReduce query returns a list of all lakes that begin with the letter M.
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
.map(pair => (pair._1, pair._2.getItem))
.filter(pair => "Lake".equals(pair._2.get("feature_class").getS))
.filter(pair => pair._2.get("feature_name").getS.startsWith("M"))
.map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS))
.sortBy(_._1)
.toDF("feature_name", "state_alpha")
The following MapReduce query returns a list of all states with at least three features that are higher than one mile.
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
.map(pair => pair._2.getItem)
.filter(pair => pair.get("elev_in_ft").getN != null)
.filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280)
.groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS))
.filter(pair => pair._2.size >= 3)
.map(pair => (pair._1._1, pair._1._2, pair._2.size))
.sortBy(pair => (pair._1, pair._2))
.toDF("state_alpha", "feature_class", "count")