

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon EMR Serverless を使用した DynamoDB への接続
<a name="using-ddb-connector"></a>

このチュートリアルでは、[米国地名委員会](https://www.usgs.gov/us-board-on-geographic-names)で提供されるデータのサブセットを Amazon S3 バケットにアップロードし、Amazon EMR Serverless の Hive または Spark を使用してクエリできる Amazon DynamoDB テーブルにデータをコピーします。

## ステップ 1: データを Amazon S3 バケットにアップロードする
<a name="using-ddb-connector-s3"></a>

Amazon S3 バケットを作成するには、「*Amazon Simple Storage Service コンソールユーザーガイド*」の「[バケットの作成](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/create-bucket.html)」の手順に従います。`amzn-s3-demo-bucket` への参照を、新しく作成されたバケットの名前に置き換えます。これで、EMR Serverless アプリケーションはジョブを実行する準備が整いました。

1. 次のコマンドを使用して、サンプルデータアーカイブ `features.zip` をダウンロードします。

   ```
   wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
   ```

1. アーカイブから `features.txt` ファイルを抽出し、ファイル内の最初の数行にアクセスします。

   ```
   unzip features.zip
   head features.txt
   ```

   結果は次のようになります。

   ```
   1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794
   875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7
   1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10
   26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681
   1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605
   1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558
   1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024
   533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0
   829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671
   541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98
   ```

   ここにある各行のフィールドは、一意の識別子、名前、自然の特徴量のタイプ、状態、緯度 (度)、経度 (度)、高さ (フィート) で示しています。

1. データを Amazon S3 にアップロードする

   ```
   aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/
   ```

## ステップ 2: Hive テーブルを作成する
<a name="using-ddb-connector-create-table"></a>

Apache Spark または Hive を使用して、Amazon S3 にアップロードされたデータを含む新しい Hive テーブルを作成します。

------
#### [ Spark ]

Spark で Hive テーブルを作成するには、次のコマンドを実行します。

```
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()

sparkSession.sql("CREATE TABLE hive_features \
    (feature_id BIGINT, \
    feature_name STRING, \
    feature_class STRING, \
    state_alpha STRING, \
    prim_lat_dec DOUBLE, \
    prim_long_dec DOUBLE, \
    elev_in_ft BIGINT) \
    ROW FORMAT DELIMITED \
    FIELDS TERMINATED BY '|' \
    LINES TERMINATED BY '\n' \
    LOCATION 's3://amzn-s3-demo-bucket/features';")
```

これで、`features.txt` ファイルからのデータが格納された Hive テーブルが用意できます。データがテーブルにあることを確認するには、次の例に示すように Spark SQL クエリを実行します。

```
sparkSession.sql(
    "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
```

------
#### [ Hive ]

Hive で Hive テーブルを作成するには、次のコマンドを実行します。

```
CREATE TABLE hive_features
    (feature_id             BIGINT,
    feature_name            STRING ,
    feature_class           STRING ,
    state_alpha             STRING,
    prim_lat_dec            DOUBLE ,
    prim_long_dec           DOUBLE ,
    elev_in_ft              BIGINT)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '|'
    LINES TERMINATED BY '\n'
    LOCATION 's3://amzn-s3-demo-bucket/features';
```

これで、`features.txt` ファイルからのデータを含む Hive テーブルが作成されました。データがテーブルにあることを確認するには、次の例に示すように HiveQL クエリを実行します。

```
SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;
```

------

## ステップ 3: データを DynamoDB にコピーする
<a name="using-ddb-connector-copy"></a>

Spark または Hive を使用してデータを新しい DynamoDB テーブルにコピーします。

------
#### [ Spark ]

前のステップで作成した Hive テーブルから DynamoDB にデータをコピーするには、[データを DynamoDB にコピーする](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.CopyDataToDDB.html)の**ステップ 1～3** に従います。これにより、`Features` という名前の新しい DynamoDB テーブルが作成されます。その後、次の例に示すように、テキストファイルから直接データを読み取って DynamoDB テーブルにコピーできます。

```
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext

import scala.collection.JavaConverters._

object EmrServerlessDynamoDbTest {

    def main(args: Array[String]): Unit = {
    
        jobConf.set("dynamodb.input.tableName", "Features")
        jobConf.set("dynamodb.output.tableName", "Features")
        jobConf.set("dynamodb.region", "region")

        jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
        jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
    
        val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/")
            .map(row => {
                val line = row.split("\\|")
                val item = new DynamoDBItemWritable()
                
                val elevInFt = if (line.length > 6) {
                    new AttributeValue().withN(line(6))
                } else {
                    new AttributeValue().withNULL(true)
                }
                
                item.setItem(Map(
                    "feature_id" -> new AttributeValue().withN(line(0)), 
                    "feature_name" -> new AttributeValue(line(1)), 
                    "feature_class" -> new AttributeValue(line(2)), 
                    "state_alpha" -> new AttributeValue(line(3)), 
                    "prim_lat_dec" -> new AttributeValue().withN(line(4)), 
                    "prim_long_dec" -> new AttributeValue().withN(line(5)),
                    "elev_in_ft" -> elevInFt)
                    .asJava)
                (new Text(""), item)
        })
        rdd.saveAsHadoopDataset(jobConf)
    }
}
```

------
#### [ Hive ]

前のステップで作成した Hive テーブルから DynamoDB にデータをコピーするには、[データを DynamoDB にコピーする](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.CopyDataToDDB.html)の手順に従います。

------

## ステップ 4: DynamoDB からデータをクエリする
<a name="using-ddb-connector-query"></a>

Spark または Hive を使用して DynamoDB テーブルをクエリします。

------
#### [ Spark ]

前のステップで作成した DynamoDB テーブルからデータをクエリするには、Spark SQL または Spark MapReduce API のいずれかを使用します。

**Example – Spark SQL を使用して DynamoDB テーブルをクエリする**  
次の Spark SQL クエリは、すべての特徴量タイプのリストをアルファベット順に返します。  

```
val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \
    FROM ddb_features \
    ORDER BY feature_class;")
```
次の Spark SQL クエリは、*M* で始まるすべてのレイクのリストを返します。  

```
val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \
    FROM ddb_features \
    WHERE feature_class = 'Lake' \
    AND feature_name LIKE 'M%' \
    ORDER BY feature_name;")
```
次の Spark SQL クエリは、1 マイルを超える特徴量を 3 つ以上持つすべての状態のリストを返します。  

```
val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \
    FROM ddb_features \
    WHERE elev_in_ft > 5280 \
    GROUP by state_alpha, feature_class \
    HAVING COUNT(*) >= 3 \
    ORDER BY state_alpha, feature_class;")
```

**Example – Spark MapReduce API を使用して DynamoDB テーブルをクエリする**  
次の MapReduce クエリは、すべての特徴量タイプのリストをアルファベット順に返します。  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => (pair._1, pair._2.getItem))
    .map(pair => pair._2.get("feature_class").getS)
    .distinct()
    .sortBy(value => value)
    .toDF("feature_class")
```
次の MapReduce クエリは、*M* で始まるすべてのレイクのリストを返します。  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => (pair._1, pair._2.getItem))
    .filter(pair => "Lake".equals(pair._2.get("feature_class").getS))
    .filter(pair => pair._2.get("feature_name").getS.startsWith("M"))
    .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS))
    .sortBy(_._1)
    .toDF("feature_name", "state_alpha")
```
次の MapReduce クエリは、1 マイルを超える特徴量を 3 つ以上持つすべての状態のリストを返します。  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => pair._2.getItem)
    .filter(pair => pair.get("elev_in_ft").getN != null)
    .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280)
    .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS))
    .filter(pair => pair._2.size >= 3)
    .map(pair => (pair._1._1, pair._1._2, pair._2.size))
    .sortBy(pair => (pair._1, pair._2))
    .toDF("state_alpha", "feature_class", "count")
```

------
#### [ Hive ]

前のステップで作成した DynamoDB テーブルからデータをクエリするには、[DynamoDB テーブル内のデータをクエリする](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.QueryDataInDynamoDB.html)の手順に従います。

------