Connessione a DynamoDB con Amazon Serverless EMR - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Connessione a DynamoDB con Amazon Serverless EMR

In questo tutorial, carichi un sottoinsieme di dati dallo United States Board on Geographic Names in un bucket Amazon S3 e poi usi Hive o Spark su Amazon Serverless per copiare i dati in una tabella EMR Amazon DynamoDB su cui eseguire query.

Fase 1: caricare i dati in un bucket Amazon S3

Per creare un bucket Amazon S3, segui le istruzioni in Creazione di un bucket nella Guida per l'utente della console di Amazon Simple Storage Service. Sostituisci i riferimenti a amzn-s3-demo-bucket con il nome del bucket appena creato. Ora la tua applicazione EMR Serverless è pronta per eseguire i lavori.

  1. Scarica l'archivio di dati di esempio features.zip con il seguente comando.

    wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
  2. Estrai il features.txt file dall'archivio e visualizza le prime poche righe del file:

    unzip features.zip head features.txt

    Il risultato dovrebbe essere simile al seguente.

    1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98

    I campi in ogni riga qui indicano un identificatore univoco, un nome, un tipo di elemento naturale, lo stato, la latitudine in gradi, la longitudine in gradi e l'altezza in piedi.

  3. Carica i tuoi dati su Amazon S3

    aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/

Fase 2: Creare una tabella Hive

Usa Apache Spark o Hive per creare una nuova tabella Hive che contenga i dati caricati in Amazon S3.

Spark

Per creare una tabella Hive con Spark, esegui il seguente comando.

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

Ora hai una tabella Hive popolata con i dati del file. features.txt Per verificare che i dati siano presenti nella tabella, esegui una SQL query Spark come mostrato nell'esempio seguente.

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
Hive

Per creare una tabella Hive con Hive, esegui il comando seguente.

CREATE TABLE hive_features (feature_id BIGINT, feature_name STRING , feature_class STRING , state_alpha STRING, prim_lat_dec DOUBLE , prim_long_dec DOUBLE , elev_in_ft BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/features';

Ora hai una tabella Hive che contiene i dati del file. features.txt Per verificare che i dati siano presenti nella tabella, esegui una query HiveQL, come mostrato nell'esempio seguente.

SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;

Fase 3: Copiare i dati su DynamoDB

Usa Spark o Hive per copiare i dati in una nuova tabella DynamoDB.

Spark

Per copiare i dati dalla tabella Hive creata nel passaggio precedente in DynamoDB, segui i passaggi 1-3 in Copiare i dati su DynamoDB. Questo crea una nuova tabella DynamoDB chiamata. Features È quindi possibile leggere i dati direttamente dal file di testo e copiarli nella tabella DynamoDB, come illustrato nell'esempio seguente.

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Hive

Per copiare i dati dalla tabella Hive creata nel passaggio precedente in DynamoDB, segui le istruzioni in Copiare i dati su DynamoDB.

Fase 4: Interrogare i dati da DynamoDB

Usa Spark o Hive per interrogare la tua tabella DynamoDB.

Spark

Per interrogare i dati dalla tabella DynamoDB creata nel passaggio precedente, puoi utilizzare Spark o SQL Spark. MapReduce API

Esempio — Interroga la tua tabella DynamoDB con Spark SQL

La seguente SQL query Spark restituisce un elenco di tutti i tipi di feature in ordine alfabetico.

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

La seguente SQL query Spark restituisce un elenco di tutti i laghi che iniziano con la lettera M.

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

La seguente SQL query Spark restituisce un elenco di tutti gli stati con almeno tre caratteristiche superiori a un miglio.

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
Esempio — Interroga la tua tabella DynamoDB con Spark MapReduce API

La seguente MapReduce query restituisce un elenco di tutti i tipi di feature in ordine alfabetico.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

La seguente MapReduce query restituisce un elenco di tutti i laghi che iniziano con la lettera M.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

La seguente MapReduce query restituisce un elenco di tutti gli stati con almeno tre caratteristiche superiori a un miglio.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
Hive

Per interrogare i dati dalla tabella DynamoDB creata nel passaggio precedente, segui le istruzioni in Interrogare i dati nella tabella DynamoDB.