Connexion à DynamoDB avec Amazon Serverless EMR - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Connexion à DynamoDB avec Amazon Serverless EMR

Dans ce didacticiel, vous chargez un sous-ensemble de données du United States Board on Geographic Names dans un compartiment Amazon S3, puis vous utilisez Hive ou Spark sur Amazon EMR Serverless pour copier les données dans une table Amazon DynamoDB que vous pouvez interroger.

Étape 1 : télécharger des données dans un compartiment Amazon S3

Pour créer un compartiment Amazon S3, suivez les instructions de la section Création d'un compartiment dans le guide de l'utilisateur de la console Amazon Simple Storage Service. Remplacez les références amzn-s3-demo-bucket à par le nom du bucket que vous venez de créer. Votre application EMR Serverless est maintenant prête à exécuter des tâches.

  1. Téléchargez l'exemple d'archive de données features.zip à l'aide de la commande suivante.

    wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
  2. Extrayez le features.txt fichier de l'archive et visualisez les premières lignes du fichier :

    unzip features.zip head features.txt

    Le résultat devrait ressembler à ce qui suit.

    1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98

    Les champs de chaque ligne indiquent un identifiant unique, un nom, un type d'élément naturel, un état, une latitude en degrés, une longitude en degrés et une hauteur en pieds.

  3. Chargez vos données sur Amazon S3

    aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/

Étape 2 : Création d'une table Hive

Utilisez Apache Spark ou Hive pour créer une nouvelle table Hive contenant les données téléchargées dans Amazon S3.

Spark

Pour créer une table Hive avec Spark, exécutez la commande suivante.

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

Vous disposez désormais d'une table Hive remplie avec les données du features.txt fichier. Pour vérifier que vos données figurent dans la table, exécutez une SQL requête Spark comme indiqué dans l'exemple suivant.

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
Hive

Pour créer une table Hive avec Hive, exécutez la commande suivante.

CREATE TABLE hive_features (feature_id BIGINT, feature_name STRING , feature_class STRING , state_alpha STRING, prim_lat_dec DOUBLE , prim_long_dec DOUBLE , elev_in_ft BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/features';

Vous disposez désormais d'une table Hive qui contient les données du features.txt fichier. Pour vérifier que vos données figurent dans la table, exécutez une requête HiveQL, comme indiqué dans l'exemple suivant.

SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;

Étape 3 : Copier les données dans DynamoDB

Utilisez Spark ou Hive pour copier des données dans une nouvelle table DynamoDB.

Spark

Pour copier les données de la table Hive que vous avez créée à l'étape précédente dans DynamoDB, suivez les étapes 1 à 3 de la section Copier les données dans DynamoDB. Cela crée une nouvelle table DynamoDB appelée. Features Vous pouvez ensuite lire les données directement depuis le fichier texte et les copier dans votre table DynamoDB, comme le montre l'exemple suivant.

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Hive

Pour copier les données de la table Hive que vous avez créée à l'étape précédente dans DynamoDB, suivez les instructions de la section Copier les données dans DynamoDB.

Étape 4 : demander des données à partir de DynamoDB

Utilisez Spark ou Hive pour interroger votre table DynamoDB.

Spark

Pour interroger les données de la table DynamoDB que vous avez créée à l'étape précédente, vous pouvez utiliser SQL Spark ou Spark. MapReduce API

Exemple — Interrogez votre table DynamoDB avec Spark SQL

La SQL requête Spark suivante renvoie une liste de tous les types de fonctionnalités par ordre alphabétique.

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

La SQL requête Spark suivante renvoie une liste de tous les lacs commençant par la lettre M.

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

La SQL requête Spark suivante renvoie une liste de tous les états comportant au moins trois entités situées à plus d'un kilomètre.

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
Exemple — Interrogez votre table DynamoDB avec le Spark MapReduce API

La MapReduce requête suivante renvoie une liste de tous les types de fonctionnalités par ordre alphabétique.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

La MapReduce requête suivante renvoie une liste de tous les lacs commençant par la lettre M.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

La MapReduce requête suivante renvoie une liste de tous les états comportant au moins trois entités situées à plus d'un kilomètre.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
Hive

Pour interroger les données de la table DynamoDB que vous avez créée à l'étape précédente, suivez les instructions de la section Interroger les données de la table DynamoDB.