

# Amazon DynamoDB용 Kinesis Data Streams 시작하기
<a name="kds_gettingstarted"></a>

이 섹션에서는 Amazon DynamoDB용 Kinesis Data Streams 테이블을 Amazon DynamoDB 콘솔, AWS Command Line Interface(AWS CLI) 및 API와 함께 사용하는 방법을 설명합니다.

## 활성 Amazon Kinesis Data Streams 생성
<a name="kds_gettingstarted.making-changes"></a>

다음의 모든 예에서는 [DynamoDB 시작하기](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStartedDynamoDB.html) 자습서의 일부로 생성된 `Music` DynamoDB 테이블을 사용합니다.

소비자를 빌드하고 Kinesis 데이터 스트림을 다른 AWS 서비스에 연결하는 방법에 대한 자세한 내용은 *Amazon Kinesis Data Streams 개발자 안내서*의 [Amazon Kinesis Data Streams에서 데이터 읽기](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)를 참조하세요.

**참고**  
 KDS 샤드를 처음 사용할 때는 사용 패턴에 따라 샤드를 스케일 업 또는 스케일 다운하도록 설정하는 것이 좋습니다. 사용 패턴에 대한 데이터를 더 많이 축적한 후에는 스트림의 샤드를 이에 맞게 조정할 수 있습니다.

------
#### [ Console ]

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)에서 Kinesis 콘솔을 엽니다.

1. **Create data stream(데이터 스트림 생성)**을 선택하고 지침에 따라 `samplestream`이라는 스트림을 생성합니다.

1. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)에서 DynamoDB 콘솔을 엽니다.

1. 콘솔 왼쪽의 탐색 창에서 **테이블**을 선택합니다.

1. **Music** 테이블을 선택합니다.

1. **내보내기 및 스트림(Exports and streams)** 탭을 선택합니다.

1. (선택 사항) **Amazon Kinesis 데이터 스트림 세부 정보**에서 레코드 타임스탬프 정밀도를 마이크로초(기본값)에서 밀리초로 변경할 수 있습니다.

1. 드롭다운 목록에서 **samplestream**을 선택합니다.

1. **켜기** 버튼을 선택합니다.

------
#### [ AWS CLI ]

1. [create-stream command](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html)를 사용하여 `samplestream`이라는 Kinesis Data Streams를 생성합니다.

   ```
   aws kinesis create-stream --stream-name samplestream --shard-count 3 
   ```

   Kinesis 데이터 스트림의 샤드 수를 설정하기 전에 [Kinesis Data Streams에 대한 샤드 관리 고려 사항](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment) 섹션을 참조하세요.

1. [describe-stream command](https://docs.aws.amazon.com/cli/latest/reference/kinesis/describe-stream.html)를 사용하여 Kinesis 스트림이 활성 상태이고 사용할 준비가 되어 있는지 확인합니다.

   ```
   aws kinesis describe-stream --stream-name samplestream
   ```

1. DynamoDB `enable-kinesis-streaming-destination` 명령을 사용하여 DynamoDB 테이블에서 Kinesis 스트리밍을 활성화합니다. `stream-arn` 값을 이전 단계에 `describe-stream`에서 반환한 값으로 바꿉니다. 선택적으로 각 레코드에서 반환되는 타임스탬프 값을 마이크로초 단위로 더 정밀하게 설정하여 스트리밍을 활성화할 수 있습니다.

   마이크로초의 타임스탬프 정밀도로 스트리밍 활성화:

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
     --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

   또는 기본 타임스탬프 정밀도(밀리초)로 스트리밍 활성화:

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
   ```

1. DynamoDB `describe-kinesis-streaming-destination` 명령을 사용하여 테이블에서 Kinesis 스트리밍이 활성 상태인지 확인합니다.

   ```
   aws dynamodb describe-kinesis-streaming-destination --table-name Music
   ```

1. [DynamoDB 개발자 안내서](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-2.html)에 설명된 대로 `put-item` 명령을 사용하여 DynamoDB 테이블에 데이터를 씁니다.

   ```
   aws dynamodb put-item \
       --table-name Music  \
       --item \
           '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
   
   aws dynamodb put-item \
       --table-name Music \
       --item \
           '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
   ```

1. Kinesis [ get-records](https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html) CLI 명령을 사용하여 Kinesis 스트림 콘텐츠를 검색합니다. 그 후 다음 코드 조각을 사용하여 스트림 콘텐츠를 역직렬화합니다.

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we want to fetch the value
        * of this attribute from the new item image. The following code fetches this value.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------
#### [ Java ]

1. Kinesis Data Streams 개발자 안내서의 지침에 따라 Java를 사용하여 `samplestream`이라는 Kinesis 데이터 스트림을 [생성](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html)합니다.

   Kinesis 데이터 스트림의 샤드 수를 설정하기 전에 [Kinesis Data Streams에 대한 샤드 관리 고려 사항](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment) 섹션을 참조하세요.

1. 다음 코드 조각을 사용하여 DynamoDB 테이블에서 Kinesis 스트리밍을 활성화합니다. 선택적으로 각 레코드에서 반환되는 타임스탬프 값을 마이크로초 단위로 더 정밀하게 설정하여 스트리밍을 활성화할 수 있습니다.

   마이크로초의 타임스탬프 정밀도로 스트리밍 활성화:

   ```
   EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
     .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
     .build();
   
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .enableKinesisStreamingConfiguration(enableKdsConfig)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

   또는 기본 타임스탬프 정밀도(밀리초)로 스트리밍 활성화:

   ```
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

1. *Kinesis Data Streams 개발자 안내서*의 지침에 따라 생성한 데이터 스트림에서 [읽어](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)옵니다.

1. 다음 코드 조각을 사용하여 스트림 콘텐츠를 역직렬화합니다.

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
        * of this attribute from the new item image, the below code would fetch this.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------

## 활성 Amazon Kinesis 데이터 스트림에 변경 사항 적용
<a name="kds_gettingstarted.making-changes"></a>

이 섹션에서는 콘솔, AWS CLI 및 API를 사용하여 DynamoDB에 대한 활성 Kinesis Data Streams 설정을 변경하는 방법을 설명합니다.

**AWS Management Console**

1. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)에서 DynamoDB 콘솔을 엽니다.

1. 테이블로 이동합니다.

1. **내보내기 및 스트림**을 선택합니다.

**AWS CLI**

1. `describe-kinesis-streaming-destination`을 직접 호출하여 스트림이 `ACTIVE`임을 확인합니다.

1. 다음 예와 같이 `UpdateKinesisStreamingDestination`을 직접 호출합니다.

   ```
   aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

1. `describe-kinesis-streaming-destination`을 직접 호출하여 스트림이 `UPDATING`임을 확인합니다.

1. 스트리밍 상태가 다시 `ACTIVE`가 될 때까지 주기적으로 `describe-kinesis-streaming-destination`을 직접 호출합니다. 타임스탬프 정밀도 업데이트가 적용되려면 일반적으로 최대 5분이 걸립니다. 이 상태가 업데이트되면 업데이트가 완료되었으며 새 정밀도 값이 향후 레코드에 적용될 것임을 나타냅니다.

1. `putItem`을 사용하여 테이블에 기록합니다.

1. Kinesis `get-records` 명령을 사용하여 스트림 콘텐츠를 가져옵니다.

1. 쓰기의 `ApproximateCreationDateTime` 정밀도가 원하는 수준인지 확인합니다.

**Java API**

1. `UpdateKinesisStreamingDestination` 요청 및 `UpdateKinesisStreamingDestination` 응답을 구성하는 코드 스니펫을 제공합니다.

1. `DescribeKinesisStreamingDestination` 요청 및 `DescribeKinesisStreamingDestination response`를 구성하는 코드 스니펫을 제공합니다.

1. 스트리밍 상태가 다시 `ACTIVE`가 될 때까지 주기적으로 `describe-kinesis-streaming-destination`을 직접 호출합니다. 이 상태는 업데이트가 완료되고 향후 레코드에 새 정밀도 값이 적용될 것임을 나타냅니다.

1. 테이블에 대한 쓰기를 수행합니다.

1.  스트림에서 읽고 스트림 콘텐츠를 역직렬화합니다.

1. 쓰기의 `ApproximateCreationDateTime` 정밀도가 원하는 수준인지 확인합니다.