Conexión a DynamoDB con Amazon EMR sin servidor - Amazon EMR

Conexión a DynamoDB con Amazon EMR sin servidor

En este tutorial, cargará un subconjunto de datos del United States Board on Geographic Names a un bucket de Amazon S3 y, a continuación, utilizará Hive o Spark en Amazon EMR sin servidor para copiar los datos en una tabla de Amazon DynamoDB que pueda consultar.

Paso 1: cargue los datos en un bucket de Amazon S3

Para crear un bucket de Amazon S3, siga las instrucciones en Crear un bucket en la Guía del usuario de la consola de Amazon Simple Storage Service. Sustituya las referencias a amzn-s3-demo-bucket por el nombre del bucket recién creado. Ahora su aplicación EMR sin servidor está lista para ejecutar trabajos.

  1. Descargue el archivo de datos de ejemplo features.zip con el comando siguiente.

    wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
  2. Extraiga el archivo features.txt del archivo y visualice las primeras líneas del archivo:

    unzip features.zip head features.txt

    El resultado debería ser similar al siguiente.

    1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98

    Los campos de cada línea indican un identificador único, nombre, tipo de característica natural, estado, latitud en grados, longitud en grados y altura en pies.

  3. Descarga de datos en Amazon S3

    aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/

Paso 2: cree una tabla

Utilice Apache Spark o Hive para crear una nueva tabla de Hive que contenga los datos cargados en Amazon S3.

Spark

Para crear una tabla de Hive con Spark, ejecute el comando siguiente.

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

Ahora, tenemos una tabla de Hive con datos que contiene los datos del archivo features.txt. Para comprobar si sus datos están en la tabla, ejecute una consulta de Spark SQL como se muestra en el siguiente ejemplo.

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
Hive

Para crear una tabla de Hive con Hive, ejecute el comando siguiente.

CREATE TABLE hive_features (feature_id BIGINT, feature_name STRING , feature_class STRING , state_alpha STRING, prim_lat_dec DOUBLE , prim_long_dec DOUBLE , elev_in_ft BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/features';

Ahora tiene una tabla de Hive que contiene los datos del archivo features.txt. Para comprobar si sus datos están en la tabla, ejecute una consulta de HiveQL como se muestra en el siguiente ejemplo.

SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;

Paso 3: copie los datos a DynamoDB

Utilice Spark o Hive para copiar los datos a una nueva tabla de DynamoDB.

Spark

Para copiar datos de la tabla Hive que creó en el paso anterior a DynamoDB,siga los pasos 1 a 3 de Copiar datos a DynamoDB. Esto crea una nueva tabla de DynamoDB llamada Features. A continuación, puede leer los datos directamente del archivo de texto y copiarlos en la tabla de DynamoDB, como se muestra en el siguiente ejemplo.

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Hive

Para copiar datos de la tabla Hive que creó en el paso anterior a DynamoDB, siga las instrucciones de Copiar datos a DynamoDB.

Paso 4: haga una consulta de datos de DynamoDB

Utilice Spark o Hive para consultar la tabla de DynamoDB.

Spark

Para consultar datos de la tabla de DynamoDB que creó en el paso anterior, puede usar Spark SQL o la API MapReduce de Spark.

ejemplo — Consulta de su tabla de DynamoDB con Spark SQL

La siguiente consulta de Spark SQL devuelve una lista de todos los tipos de características en orden alfabético.

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

La siguiente consulta de Spark SQL devuelve una lista de todos los lagos que comienzan por la letra M.

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

La siguiente consulta de Spark SQL devuelve una lista de todos los estados con al menos tres características que se encuentran a más de una milla.

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
ejemplo — Consulta de su tabla de DynamoDB con la API MapReduce de Spark

La siguiente consulta de MapReduce devuelve una lista de todos los tipos de características en orden alfabético.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

La siguiente consulta de MapReduce devuelve una lista de todos los lagos que comienzan por la letra M.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

La siguiente consulta de MapReduce devuelve una lista de todos los estados con al menos tres características que se encuentran a más de una milla.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
Hive

Para consultar datos de la tabla DynamoDB que creó en el paso anterior, siga las instrucciones de Consultar los datos en la tabla de DynamoDB.