使用 Amazon EMR Serverless 連線至 DynamoDB - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon EMR Serverless 連線至 DynamoDB

在本教學課程中,您將資料子集從美國地理名稱委員會上傳至 Amazon S3 儲存貯體,然後使用 Amazon EMR Serverless 上的 Hive 或 Spark 將資料複製到您可以查詢的 Amazon DynamoDB 資料表。

步驟 1:將資料上傳至 Amazon S3 儲存貯體

若要建立 Amazon S3 儲存貯體,請遵循 Amazon Simple Storage Service 主控台使用者指南 中的建立儲存貯體中的指示。將 的參考取代amzn-s3-demo-bucket為您新建立的儲存貯體名稱。現在,您的 EMR Serverless 應用程式已準備好執行任務。

  1. features.zip 使用下列命令下載範例資料封存。

    wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
  2. 從封存中擷取features.txt檔案,並檢視檔案中的前幾行:

    unzip features.zip head features.txt

    結果看起來應該類似於以下內容。

    1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98

    此處各行中的欄位表示唯一識別符、名稱、自然特徵類型、狀態、緯度、經度和英尺高度。

  3. 將資料上傳至 Amazon S3

    aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/

步驟 2:建立 Hive 資料表

使用 Apache Spark 或 Hive 建立新的 Hive 資料表,其中包含 Amazon S3 中上傳的資料。

Spark

若要使用 Spark 建立 Hive 資料表,請執行下列命令。

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

您現在有一個已填入的 Hive 資料表,其中包含來自 features.txt 檔案的資料。若要驗證您的資料是否在資料表中,請執行 Spark SQL查詢,如下列範例所示。

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
Hive

若要使用 Hive 建立 Hive 資料表,請執行下列命令。

CREATE TABLE hive_features (feature_id BIGINT, feature_name STRING , feature_class STRING , state_alpha STRING, prim_lat_dec DOUBLE , prim_long_dec DOUBLE , elev_in_ft BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/features';

您現在有一個 Hive 資料表,其中包含來自 features.txt 檔案的資料。若要驗證您的資料是否在資料表中,請執行 HiveQL 查詢,如下列範例所示。

SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;

步驟 3:將資料複製到 DynamoDB

使用 Spark 或 Hive 將資料複製到新的 DynamoDB 資料表。

Spark

若要從您在上一個步驟中建立的 Hive 資料表將資料複製到 DynamoDB ,請遵循將資料複製到 DynamoDB 中的步驟 1-3。這會建立新的 DynamoDB 資料表,稱為 Features。然後,您可以直接從文字檔案讀取資料,並將其複製到 DynamoDB 資料表,如下列範例所示。

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Hive

若要將您在上一個步驟中建立的 Hive 資料表中的資料複製到 DynamoDB ,請遵循將資料複製到 DynamoDB 中的指示。

步驟 4:從 DynamoDB 查詢資料

使用 Spark 或 Hive 查詢 DynamoDB 資料表。

Spark

若要查詢您在上一個步驟中建立的 DynamoDB 資料表中的資料,您可以使用 Spark SQL或 Spark MapReduce API。

範例 – 使用 Spark 查詢 DynamoDB 資料表 SQL

下列 Spark SQL查詢會依字母順序傳回所有功能類型的清單。

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

下列 Spark SQL查詢會傳回以字母 M 開頭的所有湖的清單。

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

下列 Spark SQL查詢會傳回所有狀態的清單,其中包含至少三個高於一英里的功能。

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
範例 – 使用 Spark 查詢 DynamoDB 資料表 MapReduce API

下列 MapReduce 查詢會依字母順序傳回所有功能類型的清單。

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

下列 MapReduce 查詢會傳回以字母 M 開頭的所有湖的清單。

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

下列 MapReduce 查詢會傳回所有狀態的清單,其中包含至少三個高於一英里的功能。

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
Hive

若要查詢您在上一個步驟中建立的 DynamoDB 資料表中的資料,請遵循查詢 DynamoDB 資料表 中的資料中的指示。