Conexión a DynamoDB con Amazon Serverless EMR - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Conexión a DynamoDB con Amazon Serverless EMR

En este tutorial, cargará un subconjunto de datos de la Junta de Nombres Geográficos de los Estados Unidos a un bucket de Amazon S3 y, a continuación, utilizará Hive o Spark en Amazon EMR Serverless para copiar los datos en una tabla de Amazon DynamoDB que pueda consultar.

Paso 1: Cargar datos a un bucket de Amazon S3

Para crear un depósito de Amazon S3, siga las instrucciones de la Guía del usuario de la consola de Amazon Simple Storage Service Console. Sustituya las referencias a amzn-s3-demo-bucket por el nombre del depósito recién creado. Ahora su aplicación EMR sin servidor está lista para ejecutar tareas.

  1. Descargue el archivo de datos de muestra features.zip con el siguiente comando.

    wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
  2. Extraiga el features.txt archivo del archivo y visualice las primeras líneas del archivo:

    unzip features.zip head features.txt

    El resultado debería tener un aspecto similar al siguiente.

    1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98

    Los campos de cada línea indican un identificador único, nombre, tipo de elemento natural, estado, latitud en grados, longitud en grados y altura en pies.

  3. Cargue sus datos a Amazon S3

    aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/

Paso 2: Crear una tabla de Hive

Utilice Apache Spark o Hive para crear una nueva tabla de Hive que contenga los datos cargados en Amazon S3.

Spark

Para crear una tabla de Hive con Spark, ejecute el siguiente comando.

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

Ahora tienes una tabla Hive rellena con los datos del features.txt archivo. Para comprobar que tus datos están en la tabla, ejecuta una SQL consulta de Spark como se muestra en el siguiente ejemplo.

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
Hive

Para crear una tabla de Hive con Hive, ejecuta el siguiente comando.

CREATE TABLE hive_features (feature_id BIGINT, feature_name STRING , feature_class STRING , state_alpha STRING, prim_lat_dec DOUBLE , prim_long_dec DOUBLE , elev_in_ft BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/features';

Ahora tiene una tabla Hive que contiene los datos del archivo. features.txt Para comprobar que los datos están en la tabla, ejecute una consulta HiveQL, como se muestra en el siguiente ejemplo.

SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;

Paso 3: Copiar datos a DynamoDB

Use Spark o Hive para copiar los datos a una nueva tabla de DynamoDB.

Spark

Para copiar datos de la tabla Hive que creó en el paso anterior a DynamoDB, siga los pasos 1 a 3 de Copiar datos a DynamoDB. Esto crea una nueva tabla de DynamoDB llamada. Features A continuación, puede leer los datos directamente del archivo de texto y copiarlos en la tabla de DynamoDB, como se muestra en el siguiente ejemplo.

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Hive

Para copiar datos de la tabla Hive que creó en el paso anterior a DynamoDB, siga las instrucciones de Copiar datos a DynamoDB.

Paso 4: Consulta de datos de DynamoDB

Utilice Spark o Hive para consultar la tabla de DynamoDB.

Spark

Para consultar datos de la tabla de DynamoDB que creó en el paso anterior, puede usar SQL Spark o Spark. MapReduce API

ejemplo — Consulta tu tabla de DynamoDB con Spark SQL

La siguiente SQL consulta de Spark devuelve una lista de todos los tipos de funciones en orden alfabético.

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

La siguiente SQL consulta de Spark devuelve una lista de todos los lagos que comienzan por la letra M.

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

La siguiente SQL consulta de Spark devuelve una lista de todos los estados con al menos tres entidades que se encuentran a más de una milla.

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
ejemplo — Consulta tu tabla de DynamoDB con el Spark MapReduce API

La siguiente MapReduce consulta devuelve una lista de todos los tipos de funciones en orden alfabético.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

La siguiente MapReduce consulta devuelve una lista de todos los lagos que comienzan por la letra M.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

La siguiente MapReduce consulta devuelve una lista de todos los estados con al menos tres entidades situadas a más de una milla.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
Hive

Para consultar datos de la tabla de DynamoDB que creó en el paso anterior, siga las instrucciones de Consulta los datos de la tabla de DynamoDB.