Amazon DynamoDB용 Kinesis Data Streams 시작하기
이 섹션에서는 Amazon DynamoDB용 Kinesis Data Streams 테이블을 Amazon DynamoDB 콘솔, AWS Command Line Interface(AWS CLI) 및 API와 함께 사용하는 방법을 설명합니다.
활성 Amazon Kinesis Data Streams 생성
다음의 모든 예에서는 DynamoDB 시작하기 자습서의 일부로 생성된 Music
DynamoDB 테이블을 사용합니다.
소비자를 빌드하고 Kinesis 데이터 스트림을 다른 AWS 서비스에 연결하는 방법에 대한 자세한 내용은 Amazon Kinesis Data Streams 개발자 안내서의 Amazon Kinesis Data Streams에서 데이터 읽기를 참조하세요.
KDS 샤드를 처음 사용할 때는 사용 패턴에 따라 샤드를 스케일 업 또는 스케일 다운하도록 설정하는 것이 좋습니다. 사용 패턴에 대한 데이터를 더 많이 축적한 후에는 스트림의 샤드를 이에 맞게 조정할 수 있습니다.
- Console
-
- AWS CLI
-
-
create-stream command를 사용하여 samplestream
이라는 Kinesis Data Streams를 생성합니다.
aws kinesis create-stream --stream-name samplestream --shard-count 3
Kinesis 데이터 스트림의 샤드 수를 설정하기 전에 Kinesis Data Streams에 대한 샤드 관리 고려 사항 섹션을 참조하세요.
-
describe-stream command를 사용하여 Kinesis 스트림이 활성 상태이고 사용할 준비가 되어 있는지 확인합니다.
aws kinesis describe-stream --stream-name samplestream
-
DynamoDB enable-kinesis-streaming-destination
명령을 사용하여 DynamoDB 테이블에서 Kinesis 스트리밍을 활성화합니다. stream-arn
값은 이전 단계에 describe-stream
에서 반환한 값으로 바꿉니다. 선택적으로 각 레코드에서 반환되는 타임스탬프 값을 마이크로초 단위로 더 정밀하게 설정하여 스트리밍을 활성화할 수 있습니다.
마이크로초의 타임스탬프 정밀도로 스트리밍 활성화:
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
또는 기본 타임스탬프 정밀도(밀리초)로 스트리밍 활성화:
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
-
DynamoDB describe-kinesis-streaming-destination
명령을 사용하여 테이블에서 Kinesis 스트리밍이 활성 상태인지 확인합니다.
aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
DynamoDB 개발자 안내서에 설명된 대로 put-item
명령을 사용하여 DynamoDB 테이블에 데이터를 씁니다.
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
Kinesis get-records CLI 명령을 사용하여 Kinesis 스트림 콘텐츠를 검색합니다. 그 후 다음 코드 조각을 사용하여 스트림 콘텐츠를 역직렬화합니다.
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we want to fetch the value
* of this attribute from the new item image. The following code fetches this value.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
- Java
-
-
Kinesis Data Streams 개발자 안내서의 지침에 따라 Java를 사용하여 samplestream
이라는 Kinesis 데이터 스트림을 생성합니다.
Kinesis 데이터 스트림의 샤드 수를 설정하기 전에 Kinesis Data Streams에 대한 샤드 관리 고려 사항 단원을 참조하세요.
-
다음 코드 조각을 사용하여 DynamoDB 테이블에서 Kinesis 스트리밍을 활성화합니다. 선택적으로 각 레코드에서 반환되는 타임스탬프 값을 마이크로초 단위로 더 정밀하게 설정하여 스트리밍을 활성화할 수 있습니다.
마이크로초의 타임스탬프 정밀도로 스트리밍 활성화:
EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
.approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
.build();
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.enableKinesisStreamingConfiguration(enableKdsConfig)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
또는 기본 타임스탬프 정밀도(밀리초)로 스트리밍 활성화:
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
Kinesis Data Streams 개발자 안내서의 지침에 따라 생성한 데이터 스트림에서 읽어옵니다.
-
다음 코드 조각을 사용하여 스트림 콘텐츠를 역직렬화합니다.
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
* of this attribute from the new item image, the below code would fetch this.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
활성 Amazon Kinesis 데이터 스트림에 변경 사항 적용
이 섹션에서는 콘솔, AWS CLI 및 API를 사용하여 DynamoDB에 대한 활성 Kinesis Data Streams 설정을 변경하는 방법을 설명합니다.
AWS Management Console
AWS CLI
-
describe-kinesis-streaming-destination
을 직접 호출하여 스트림이 ACTIVE
임을 확인합니다.
-
다음 예와 같이 UpdateKinesisStreamingDestination
을 직접 호출합니다.
aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
-
describe-kinesis-streaming-destination
을 직접 호출하여 스트림이 UPDATING
임을 확인합니다.
-
스트리밍 상태가 다시 ACTIVE
가 될 때까지 주기적으로 describe-kinesis-streaming-destination
을 직접 호출합니다. 타임스탬프 정밀도 업데이트가 적용되려면 일반적으로 최대 5분이 걸립니다. 이 상태가 업데이트되면 업데이트가 완료되었으며 새 정밀도 값이 향후 레코드에 적용될 것임을 나타냅니다.
-
putItem
을 사용하여 테이블에 기록합니다.
-
Kinesis get-records
명령을 사용하여 스트림 콘텐츠를 가져옵니다.
-
쓰기의 ApproximateCreationDateTime
정밀도가 원하는 수준인지 확인합니다.
Java API
-
UpdateKinesisStreamingDestination
요청 및 UpdateKinesisStreamingDestination
응답을 구성하는 코드 스니펫을 제공합니다.
-
DescribeKinesisStreamingDestination
요청 및 DescribeKinesisStreamingDestination response
를 구성하는 코드 스니펫을 제공합니다.
-
스트리밍 상태가 다시 ACTIVE
가 될 때까지 주기적으로 describe-kinesis-streaming-destination
을 직접 호출합니다. 이 상태는 업데이트가 완료되고 향후 레코드에 새 정밀도 값이 적용될 것임을 나타냅니다.
-
테이블에 대한 쓰기를 수행합니다.
-
스트림에서 읽고 스트림 콘텐츠를 역직렬화합니다.
-
쓰기의 ApproximateCreationDateTime
정밀도가 원하는 수준인지 확인합니다.