Menghubungkan ke DynamoDB dengan Amazon Serverless EMR - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menghubungkan ke DynamoDB dengan Amazon Serverless EMR

Dalam tutorial ini, Anda mengunggah subset data dari United States Board on Geographic Names ke bucket Amazon S3 dan kemudian menggunakan Hive atau Spark di Amazon Serverless untuk menyalin data ke tabel EMR Amazon DynamoDB yang dapat Anda kueri.

Langkah 1: Unggah data ke bucket Amazon S3

Untuk membuat bucket Amazon S3, ikuti petunjuk dalam Membuat bucket di Panduan Pengguna Amazon Simple Storage Service Console. Ganti referensi amzn-s3-demo-bucket dengan nama bucket yang baru Anda buat. Sekarang aplikasi EMR Tanpa Server Anda siap menjalankan pekerjaan.

  1. Unduh arsip data sampel features.zip dengan perintah berikut.

    wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
  2. Ekstrak features.txt file dari arsip dan lihat yang pertama beberapa baris dalam file:

    unzip features.zip head features.txt

    Hasilnya akan terlihat mirip dengan yang berikut ini.

    1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98

    Bidang di setiap baris di sini menunjukkan pengidentifikasi unik, nama, jenis fitur alami, keadaan, garis lintang dalam derajat, bujur dalam derajat, dan tinggi dalam kaki.

  3. Unggah data Anda ke Amazon S3

    aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/

Langkah 2: Buat tabel Hive

Gunakan Apache Spark atau Hive untuk membuat tabel Hive baru yang berisi data yang diunggah di Amazon S3.

Spark

Untuk membuat tabel Hive dengan Spark, jalankan perintah berikut.

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

Anda sekarang memiliki tabel Hive terisi dengan data dari file. features.txt Untuk memverifikasi bahwa data Anda ada dalam tabel, jalankan SQL kueri Spark seperti yang ditunjukkan pada contoh berikut.

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
Hive

Untuk membuat tabel Hive dengan Hive, jalankan perintah berikut.

CREATE TABLE hive_features (feature_id BIGINT, feature_name STRING , feature_class STRING , state_alpha STRING, prim_lat_dec DOUBLE , prim_long_dec DOUBLE , elev_in_ft BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/features';

Anda sekarang memiliki tabel Hive yang berisi data dari features.txt file. Untuk memverifikasi bahwa data Anda ada dalam tabel, jalankan kueri HiveQL, seperti yang ditunjukkan pada contoh berikut.

SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;

Langkah 3: Salin data ke DynamoDB

Gunakan Spark atau Hive untuk menyalin data ke tabel DynamoDB baru.

Spark

Untuk menyalin data dari tabel Hive yang Anda buat pada langkah sebelumnya ke DynamoDB, ikuti Langkah 1-3 di Salin data ke DynamoDB. Ini menciptakan tabel DynamoDB baru yang disebut. Features Anda kemudian dapat membaca data langsung dari file teks dan menyalinnya ke tabel DynamoDB Anda, seperti contoh berikut menunjukkan.

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Hive

Untuk menyalin data dari tabel Hive yang Anda buat pada langkah sebelumnya ke DynamoDB, ikuti petunjuk di Salin data ke DynamoDB.

Langkah 4: Kueri data dari DynamoDB

Gunakan Spark atau Hive untuk menanyakan tabel DynamoDB Anda.

Spark

Untuk kueri data dari tabel DynamoDB yang Anda buat pada langkah sebelumnya, Anda dapat menggunakan SQL Spark atau Spark. MapReduce API

contoh — Kueri tabel DynamoDB Anda dengan Spark SQL

SQLKueri Spark berikut mengembalikan daftar semua jenis fitur dalam urutan abjad.

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

SQLKueri Spark berikut mengembalikan daftar semua danau yang dimulai dengan huruf M.

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

SQLKueri Spark berikut mengembalikan daftar semua status dengan setidaknya tiga fitur yang lebih tinggi dari satu mil.

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
contoh — Kueri tabel DynamoDB Anda dengan Spark MapReduce API

MapReduce Query berikut mengembalikan daftar semua jenis fitur dalam urutan abjad.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

MapReduce Query berikut mengembalikan daftar semua danau yang dimulai dengan huruf M.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

MapReduce Kueri berikut mengembalikan daftar semua negara bagian dengan setidaknya tiga fitur yang lebih tinggi dari satu mil.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
Hive

Untuk kueri data dari tabel DynamoDB yang Anda buat pada langkah sebelumnya, ikuti petunjuk di Kueri data dalam tabel DynamoDB.