Como se conectar ao DynamoDB com o Amazon EMR Sem Servidor - Amazon EMR

Como se conectar ao DynamoDB com o Amazon EMR Sem Servidor

Neste tutorial, você faz upload de um subconjunto de dados do United States Board on Geographic Names para um bucket do Amazon S3 e, em seguida, usa o Hive ou o Spark no Amazon EMR Sem Servidor para copiar os dados em uma tabela do Amazon DynamoDB que possa consultar.

Etapa 1: upload dos dados em um bucket do Amazon S3

Para criar um bucket do Amazon S3, siga as instruções em Criação de um bucket no Guia do usuário do console do Amazon Simple Storage Service. Substitua as referências a amzn-s3-demo-bucket pelo nome do bucket recém-criado. Agora, a aplicação do EMR Sem Servidor está pronta para executar trabalhos.

  1. Faça download do arquivo de exemplo de dados features.zip com o comando a seguir.

    wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
  2. Extraia o arquivo features.txt do arquivamento e exiba as primeiras linhas do arquivo:

    unzip features.zip head features.txt

    O resultado deve ser semelhante ao mostrado a seguir.

    1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98

    Os campos em cada linha aqui indicam um identificador exclusivo, nome, tipo de característica natural, estado, latitude em graus, longitude em graus e altura em pés.

  3. Upload de dados no Amazon S3

    aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/

Etapa 2: criar uma tabela do Hive

Use o Apache Spark ou o Hive para criar uma tabela do Hive que contenha os dados carregados no Amazon S3.

Spark

Para criar uma tabela do Hive com o Spark, execute o comando a seguir.

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

Agora você tem uma tabela do Hive preenchida com os dados do arquivo features.txt. Para verificar se seus dados estão na tabela, execute uma consulta do Spark SQL conforme mostrado no exemplo a seguir.

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
Hive

Para criar uma tabela do Hive com o Hive, execute o comando a seguir.

CREATE TABLE hive_features (feature_id BIGINT, feature_name STRING , feature_class STRING , state_alpha STRING, prim_lat_dec DOUBLE , prim_long_dec DOUBLE , elev_in_ft BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/features';

Agora você tem uma tabela do Hive que contém dados do arquivo features.txt. Para verificar se seus dados estão na tabela, execute uma consulta do HiveQL, conforme mostrado no exemplo a seguir.

SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;

Etapa 3: copiar dados para o DynamoDB

Use o Spark ou o Hive para copiar dados para uma nova tabela do DynamoDB.

Spark

Para copiar dados da tabela do Hive criada na etapa anterior para o DynamoDB, siga as etapas de 1 a 3 em Copiar dados para o DynamoDB. Isso cria uma tabela do DynamoDB chamada Features. Em seguida, você pode ler os dados diretamente do arquivo de texto e copiá-los para a tabela do DynamoDB, conforme mostra o exemplo a seguir.

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Hive

Para copiar dados da tabela do Hive criados na etapa anterior para o DynamoDB, siga as instruções em Copiar dados para o DynamoDB.

Etapa 4: consultar dados do DynamoDB

Use o Spark ou o Hive para consultar sua tabela do DynamoDB.

Spark

Para consultar dados da tabela do DynamoDB criada na etapa anterior, você pode usar o Spark SQL ou a API MapReduce do Spark.

exemplo — Consulte sua tabela do DynamoDB com o Spark SQL

A consulta do Spark SQL a seguir retorna uma lista de todos os tipos de recursos em ordem alfabética.

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

A consulta do Spark SQL a seguir retorna uma lista de todos os lakes que começam com a letra M.

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

A consulta do Spark SQL a seguir retorna uma lista de todos os estados com pelo menos três recursos que ultrapassam uma milha.

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
exemplo — Consulte sua tabela do DynamoDB com a API MapReduce do Spark

A consulta do MapReduce a seguir retorna uma lista de todos os tipos de recursos em ordem alfabética.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

A consulta do MapReduce a seguir retorna uma lista de todos os lakes que começam com a letra M.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

A consulta do MapReduce a seguir retorna uma lista de todos os estados com pelo menos três recursos com mais de uma milha.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
Hive

Para consultar dados da tabela do DynamoDB criada na etapa anterior, siga as instruções em Query the data in the DynamoDB table.