

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menghubungkan ke DynamoDB dengan Amazon EMR Tanpa Server
<a name="using-ddb-connector"></a>

Dalam tutorial ini, Anda mengunggah subset data dari [Dewan Amerika Serikat tentang Nama Geografis](https://www.usgs.gov/us-board-on-geographic-names) ke bucket Amazon S3 dan kemudian menggunakan Hive atau Spark di Amazon EMR Tanpa Server untuk menyalin data ke tabel Amazon DynamoDB untuk kueri. 

## Langkah 1: Unggah data ke bucket Amazon S3
<a name="using-ddb-connector-s3"></a>

Untuk membuat bucket Amazon S3, ikuti petunjuk dalam [Membuat](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/create-bucket.html) bucket di *Panduan Pengguna Amazon Simple Storage Service Console*. Ganti referensi `{{amzn-s3-demo-bucket}}` dengan nama bucket yang baru Anda buat. Sekarang aplikasi EMR Serverless Anda siap menjalankan pekerjaan.

1. Unduh arsip data sampel `features.zip` dengan perintah berikut.

   ```
   wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
   ```

1. Ekstrak `features.txt` file dari arsip dan akses yang pertama beberapa baris dalam file:

   ```
   unzip features.zip
   head features.txt
   ```

   Hasilnya akan tampak mirip dengan yang berikut ini.

   ```
   1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794
   875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7
   1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10
   26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681
   1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605
   1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558
   1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024
   533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0
   829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671
   541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98
   ```

   Bidang di setiap baris di sini menunjukkan pengidentifikasi unik, nama, jenis fitur alami, keadaan, garis lintang dalam derajat, bujur dalam derajat, dan tinggi dalam kaki.

1. Unggah data Anda ke Amazon S3

   ```
   aws s3 cp features.txt s3://{{amzn-s3-demo-bucket}}/features/
   ```

## Langkah 2: Buat tabel Hive
<a name="using-ddb-connector-create-table"></a>

Gunakan Apache Spark atau Hive untuk membuat tabel Hive baru yang berisi data yang diunggah di Amazon S3.

------
#### [ Spark ]

Untuk membuat tabel Hive dengan Spark, jalankan perintah berikut.

```
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()

sparkSession.sql("CREATE TABLE {{hive_features}} \
    (feature_id BIGINT, \
    feature_name STRING, \
    feature_class STRING, \
    state_alpha STRING, \
    prim_lat_dec DOUBLE, \
    prim_long_dec DOUBLE, \
    elev_in_ft BIGINT) \
    ROW FORMAT DELIMITED \
    FIELDS TERMINATED BY '|' \
    LINES TERMINATED BY '\n' \
    LOCATION 's3://{{amzn-s3-demo-bucket}}/features';")
```

Anda sekarang memiliki tabel Hive terisi dengan data dari file. `features.txt` Untuk memverifikasi bahwa data Anda ada dalam tabel, jalankan query Spark SQL seperti yang ditunjukkan pada contoh berikut.

```
sparkSession.sql(
    "SELECT state_alpha, COUNT(*) FROM {{hive_features}} GROUP BY state_alpha;")
```

------
#### [ Hive ]

Untuk membuat tabel Hive dengan Hive, jalankan perintah berikut.

```
CREATE TABLE {{hive_features}}
    (feature_id             BIGINT,
    feature_name            STRING ,
    feature_class           STRING ,
    state_alpha             STRING,
    prim_lat_dec            DOUBLE ,
    prim_long_dec           DOUBLE ,
    elev_in_ft              BIGINT)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '|'
    LINES TERMINATED BY '\n'
    LOCATION 's3://{{amzn-s3-demo-bucket}}/features';
```

Anda sekarang memiliki tabel Hive yang berisi data dari `features.txt` file. Untuk memverifikasi bahwa data Anda ada dalam tabel, jalankan kueri HiveQL, seperti yang ditunjukkan pada contoh berikut.

```
SELECT state_alpha, COUNT(*) FROM {{hive_features}} GROUP BY state_alpha;
```

------

## Langkah 3: Salin data ke DynamoDB
<a name="using-ddb-connector-copy"></a>

Gunakan Spark atau Hive untuk menyalin data ke tabel DynamoDB baru.

------
#### [ Spark ]

Untuk menyalin data dari tabel Hive yang Anda buat pada langkah sebelumnya ke DynamoDB, **ikuti Langkah** 1-3 di [Salin](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.CopyDataToDDB.html) data ke DynamoDB. Ini menciptakan tabel DynamoDB baru yang disebut. `Features` Anda kemudian dapat membaca data langsung dari file teks dan menyalinnya ke tabel DynamoDB Anda, seperti contoh berikut menunjukkan.

```
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext

import scala.collection.JavaConverters._

object EmrServerlessDynamoDbTest {

    def main(args: Array[String]): Unit = {
    
        jobConf.set("dynamodb.input.tableName", "Features")
        jobConf.set("dynamodb.output.tableName", "Features")
        jobConf.set("dynamodb.region", "{{region}}")

        jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
        jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
    
        val rdd = sc.textFile("s3://{{amzn-s3-demo-bucket}}/ddb-connector/")
            .map(row => {
                val line = row.split("\\|")
                val item = new DynamoDBItemWritable()
                
                val elevInFt = if (line.length > 6) {
                    new AttributeValue().withN(line(6))
                } else {
                    new AttributeValue().withNULL(true)
                }
                
                item.setItem(Map(
                    "feature_id" -> new AttributeValue().withN(line(0)), 
                    "feature_name" -> new AttributeValue(line(1)), 
                    "feature_class" -> new AttributeValue(line(2)), 
                    "state_alpha" -> new AttributeValue(line(3)), 
                    "prim_lat_dec" -> new AttributeValue().withN(line(4)), 
                    "prim_long_dec" -> new AttributeValue().withN(line(5)),
                    "elev_in_ft" -> elevInFt)
                    .asJava)
                (new Text(""), item)
        })
        rdd.saveAsHadoopDataset(jobConf)
    }
}
```

------
#### [ Hive ]

Untuk menyalin data dari tabel Hive yang Anda buat pada langkah sebelumnya ke DynamoDB, ikuti petunjuk di [Salin](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.CopyDataToDDB.html) data ke DynamoDB.

------

## Langkah 4: Kueri data dari DynamoDB
<a name="using-ddb-connector-query"></a>

Gunakan Spark atau Hive untuk menanyakan tabel DynamoDB Anda.

------
#### [ Spark ]

Untuk kueri data dari tabel DynamoDB yang Anda buat pada langkah sebelumnya, gunakan Spark SQL atau Spark API. MapReduce 

**Example — Kueri tabel DynamoDB Anda dengan Spark SQL**  
Query Spark SQL berikut mengembalikan daftar semua jenis fitur dalam urutan abjad.  

```
val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \
    FROM ddb_features \
    ORDER BY feature_class;")
```
*Query Spark SQL berikut mengembalikan daftar semua danau yang dimulai dengan huruf M.*  

```
val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \
    FROM ddb_features \
    WHERE feature_class = 'Lake' \
    AND feature_name LIKE 'M%' \
    ORDER BY feature_name;")
```
Kueri SQL Spark berikut mengembalikan daftar semua negara bagian dengan setidaknya tiga fitur yang lebih tinggi dari satu mil.  

```
val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \
    FROM ddb_features \
    WHERE elev_in_ft > 5280 \
    GROUP by state_alpha, feature_class \
    HAVING COUNT(*) >= 3 \
    ORDER BY state_alpha, feature_class;")
```

**Example — Kueri tabel DynamoDB Anda dengan Spark API MapReduce**  
 MapReduce Query berikut mengembalikan daftar semua jenis fitur dalam urutan abjad.  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => (pair._1, pair._2.getItem))
    .map(pair => pair._2.get("feature_class").getS)
    .distinct()
    .sortBy(value => value)
    .toDF("feature_class")
```
 MapReduce Query berikut mengembalikan daftar semua danau yang dimulai dengan huruf *M*.  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => (pair._1, pair._2.getItem))
    .filter(pair => "Lake".equals(pair._2.get("feature_class").getS))
    .filter(pair => pair._2.get("feature_name").getS.startsWith("M"))
    .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS))
    .sortBy(_._1)
    .toDF("feature_name", "state_alpha")
```
 MapReduce Kueri berikut mengembalikan daftar semua negara bagian dengan setidaknya tiga fitur yang lebih tinggi dari satu mil.  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => pair._2.getItem)
    .filter(pair => pair.get("elev_in_ft").getN != null)
    .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280)
    .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS))
    .filter(pair => pair._2.size >= 3)
    .map(pair => (pair._1._1, pair._1._2, pair._2.size))
    .sortBy(pair => (pair._1, pair._2))
    .toDF("state_alpha", "feature_class", "count")
```

------
#### [ Hive ]

Untuk kueri data dari tabel DynamoDB yang Anda buat pada langkah sebelumnya, ikuti petunjuk di [Kueri data dalam tabel DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.QueryDataInDynamoDB.html).

------