

# Amazon DynamoDB 用 Kinesis Data Streams の開始方法
<a name="kds_gettingstarted"></a>

このセクションでは、Amazon DynamoDB コンソール、AWS Command Line Interface (AWS CLI)、および API を活用して Amazon DynamoDB 用の Amazon Kinesis Data Streams テーブルを使用する方法について説明します。

## アクティブな Amazon Kinesis Data Streams の作成
<a name="kds_gettingstarted.making-changes"></a>

これらの例はすべて、[DynamoDB の使用開始](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStartedDynamoDB.html)チュートリアルの一部として作成された `Music` DynamoDB テーブルを使用しています。

コンシューマーを構築し、Kinesis データストリームを他の AWS のサービスに接続する詳細方法については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Kinesis Data Streams からのデータの読み込み](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)」を参照してください。

**注記**  
 KDS シャードを初めて使用するときは、使用パターンに合わせてシャードをスケールアップまたはスケールダウンするように設定することをお勧めします。使用パターンに関するデータをさらに蓄積したら、それに合わせてストリーム内のシャードを調整できます。

------
#### [ Console ]

1. AWS マネジメントコンソール にサインインし、Kinesis コンソール ([https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)) を開きます。

1. [**Create data stream (データストリーミングの作成)**] を選択し、指示に従って `samplestream` というストリーミングを作成します。

1. DynamoDB コンソール ([https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)) を開きます。

1. コンソールの左側のナビゲーションペインで、[**テーブル**] を選択します。

1. **[Music]** テーブルを選択します。

1. [**エクスポートとストリーム**] タブを選択します。

1. (オプション) **[Amazon Kinesis データストリームの詳細]** で、レコードのタイムスタンプの精度をマイクロ秒 (デフォルト) からミリ秒に変更できます。

1. ドロップダウンリストから **samplestream** を選択します。

1. **[オンにする]** ボタンを選択します。

------
#### [ AWS CLI ]

1. [create-stream コマンド](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html)を使用して、`samplestream` という名前の Kinesis データストリームを作成します。

   ```
   aws kinesis create-stream --stream-name samplestream --shard-count 3 
   ```

   Kinesis データストリームのシャード数を設定する前に「[Kinesis Data Streams のシャード管理に関する考慮事項](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment)」を参照してください。

1. Kinesis ストリームがアクティブで、使用できる状態になっていることを確認するには、[describe-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/describe-stream.html) コマンドを使用します。

   ```
   aws kinesis describe-stream --stream-name samplestream
   ```

1. DynamoDB `enable-kinesis-streaming-destination` コマンドを使用して、DynamoDB テーブルで Kinesis ストリーミングを有効にします。`stream-arn` の値を、前のステップの `describe-stream` によって返された値で置き換えます。オプションで、各レコードに返されるタイムスタンプ値の精度をより細かく (マイクロ秒) したストリーミングを有効にします。

   マイクロ秒のタイムスタンプ精度でのストリーミングを有効にします。

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
     --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

   または、デフォルトのタイムスタンプ精度 (ミリ秒) でのストリーミングを有効にします。

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
   ```

1. DynamoDB `describe-kinesis-streaming-destination` コマンドを使用して、テーブルで Kinesis ストリーミングがアクティブかどうかを確認します。

   ```
   aws dynamodb describe-kinesis-streaming-destination --table-name Music
   ```

1. 「[DynamoDB デベロッパーガイド](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-2.html)」の説明通りに、`put-item` コマンドを使用して DynamoDB テーブルにデータを書き込みます。

   ```
   aws dynamodb put-item \
       --table-name Music  \
       --item \
           '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
   
   aws dynamodb put-item \
       --table-name Music \
       --item \
           '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
   ```

1. Kinesis [get-records](https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html) CLI コマンドを使用して、Kinesis ストリームコンテンツを取得します。次に、以下のコードスニペットを使用して、ストリーミングコンテンツを逆シリアル化します。

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we want to fetch the value
        * of this attribute from the new item image. The following code fetches this value.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------
#### [ Java ]

1. 「Kinesis Data Streams デベロッパーガイド」の指示に従って、Java を使用し `samplestream` という名前の Kinesis データストリームを[作成](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html)します。

   Kinesis Data Streams のシャード数を設定する前に [Kinesis Data Streams のシャード管理に関する考慮事項](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment) を参照してください。

1. 次のコードスニペットを使用して、DynamoDB テーブルで Kinesis ストリーミングを有効にします。オプションで、各レコードに返されるタイムスタンプ値の精度をより細かく (マイクロ秒) したストリーミングを有効にします。

   マイクロ秒のタイムスタンプ精度でのストリーミングを有効にします。

   ```
   EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
     .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
     .build();
   
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .enableKinesisStreamingConfiguration(enableKdsConfig)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

   または、デフォルトのタイムスタンプ精度 (ミリ秒) でのストリーミングを有効にします。

   ```
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

1. 「*Kinesis Data Streams デベロッパーガイド*」の指示に従って、作成したデータストリームから[読み込み](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)ます。

1. 次のコードスニペットを使用して、ストリームコンテンツを逆シリアル化します。

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
        * of this attribute from the new item image, the below code would fetch this.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------

## アクティブな Amazon Kinesis データストリームに変更を加える
<a name="kds_gettingstarted.making-changes"></a>

このセクションでは、コンソール、AWS CLI、API を使用して DynamoDB 用 Kinesis Data Streams のセットアップを変更する方法について説明します。

**AWS マネジメントコンソール**

1. DynamoDB コンソール ([https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)) を開きます。

1. テーブルに移動します。

1. **[エクスポートおよびストリーム]** タブを選択します。

**AWS CLI**

1. `describe-kinesis-streaming-destination` を呼び出して、ストリームが `ACTIVE` であることを確認します。

1. 次の例のように、`UpdateKinesisStreamingDestination` を呼び出します。

   ```
   aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

1. `describe-kinesis-streaming-destination` を呼び出して、ストリームが `UPDATING` であることを確認します。

1. ストリーミングステータスが再び `ACTIVE` になるまで `describe-kinesis-streaming-destination` を定期的に呼び出します。タイムスタンプの精度の更新が有効になるまで、最大 5 分かかることがあります。このステータスが更新されると、更新が完了したことを表し、新しい精度値が今後のレコードに適用されます。

1. `putItem` を使用してテーブルに書き込みます。

1. Kinesis `get-records` コマンドを使用して、Kinesis ストリームコンテンツを取得します。

1. 書き込みの `ApproximateCreationDateTime` の精度が希望どおりであることを確認します。

**Java API**

1. `UpdateKinesisStreamingDestination` リクエストと `UpdateKinesisStreamingDestination` レスポンスを構成するコードスニペットを提供します。

1. `DescribeKinesisStreamingDestination` リクエストと `DescribeKinesisStreamingDestination response` を構成するコードスニペットを提供します。

1. ストリーミングのステータスが、更新が完了し、将来のレコードに新しい精度値が適用されることを示す `ACTIVE` に戻るまで、`describe-kinesis-streaming-destination` を定期的に呼び出します。

1. テーブルへの書き込みを実行します。

1.  ストリームから読み取り、ストリームコンテンツを逆シリアル化します。

1. 書き込みの `ApproximateCreationDateTime` の精度が希望どおりであることを確認します。