기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
자습서: AWS CLI를 사용하여 기본 Kinesis Data Streams 작업 수행
이 섹션에서는 AWS CLI를 사용하여 명령줄에서 Kinesis 데이터 스트림을 사용하는 기본 방법을 설명합니다. Amazon Kinesis Data Streams 용어 및 개념에 설명된 개념을 숙지하십시오.
참고
Kinesis Data Streams는 AWS 프리 티어에서 제공되지 않으므로 스트림을 생성한 후에는 Kinesis Data Streams 사용에 대한 일반 요금이 계정에 부과됩니다. 이 자습서를 완료하면 AWS 리소스를 삭제하여 요금 발생을 중지하세요. 자세한 내용은 4단계: 정리 단원을 참조하십시오.
1단계: VPC 생성
첫 번째 단계는 스트림을 생성하고 성공적으로 생성되었는지 확인하는 것입니다. 다음 명령을 사용하여 이름이 "Foo"인 스트림을 생성합니다.
aws kinesis create-stream --stream-name Foo
그런 다음, 다음 명령을 사용하여 스트림의 생성 진행 상황을 확인합니다.
aws kinesis describe-stream-summary --stream-name Foo
다음 예와 비슷한 출력 결과를 얻어야 합니다.
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
이 예에서 스트림은 아직 사용할 준비가 되지 않았다는 CREATING 상태입니다. 잠시 후 다시 확인하면 다음 예와 비슷한 출력 결과가 표시되어야 합니다.
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
이 출력에는 이 자습서에서 필요하지 않은 정보가 있습니다. 지금 중요한 정보는 스트림이 사용할 준비가 되었음을 알리는 "StreamStatus": "ACTIVE"
와 요청한 단일 샤드에 대한 정보입니다. 다음과 같이 list-streams
명령을 사용하여 새 스트림의 존재 여부를 확인할 수도 있습니다.
aws kinesis list-streams
출력:
{
"StreamNames": [
"Foo"
]
}
2단계: 레코드 넣기
이제 스트림이 활성 상태이므로 일부 데이터를 입력할 준비가 되었습니다. 이 자습서의 경우 가능한 가장 간단한 명령인 put-record
를 사용합니다. 이 명령은 "testdata" 텍스트를 포함하는 단일 데이터 레코드를 스트림에 넣습니다.
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
이 명령이 제대로 실행되면 다음 예와 비슷한 출력 결과가 발생합니다.
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
}
축하합니다. 스트림에 데이터를 추가했습니다. 그런 다음 스트림에서 데이터를 가져오는 방법이 표시됩니다.
3단계: 레코드 가져오기
GetShardIterator
스트림에서 데이터를 가져오기 전에 관심이 있는 샤드에 대한 샤드 반복자를 가져와야 합니다. 샤드 반복자는 소비자(이 경우 get-record
명령)가 읽을 샤드와 스트림의 위치를 나타냅니다. 다음과 같이 get-shard-iterator
명령을 사용합니다.
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
aws kinesis
명령의 배경에는 Kinesis Data Streams API가 있으므로 표시된 파라미터에 대해 궁금한 경우 GetShardIterator
API 참조 주제에서 해당 파라미터에 대해 읽을 수 있습니다. 이 명령이 제대로 실행되면 출력에 다음 예와 비슷한 결과가 표시됩니다.
{
"ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg="
}
외견상으로 무작위처럼 보이는 문자의 긴 문자열이 샤드 반복자입니다(사용자의 반복자는 다름). 다음에 표시된 대로 샤드 반복자를 복사하여 가져오기 명령에 붙여 넣어야 합니다. 샤드 반복자에는 300초의 유효한 수명 주기가 있습니다. 이 수명 주기는 샤드 반복자를 복사하여 다음 명령에 붙여 넣는데 충분한 시간이어야 합니다. 다음 명령에 붙여 넣기 전에 샤드 반복자에서 모든 줄 바꿈을 제거해야 합니다. 샤드 반복자가 더 이상 유효하지 않다는 오류 메시지가 발생하는 경우 get-shard-iterator
명령을 다시 실행하세요.
GetRecords
get-records
명령은 스트림에서 데이터를 가져오고 Kinesis Data Streams API의 GetRecords
호출로 확인됩니다. 샤드 반복기는 샤드에서 순차적으로 데이터 레코드 읽기를 시작할 위치를 지정합니다. 반복기가 가리키는 샤드 부분에 사용 가능한 레코드가 없는 경우 GetRecords
는 빈 목록을 반환합니다. 레코드를 포함하는 샤드 부분을 가져오기 위해 여러 번의 호출이 수행될 수 있습니다.
다음 get-records
명령의 예에서는 다음과 같이 합니다.
aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=
bash 등 Unix 유형 명령 프로세서에서 이 자습서를 실행하는 경우 다음과 같이 중첩 명령을 사용하여 샤드 반복자의 획득을 자동화할 수 있습니다.
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR
PowerShell을 지원하는 시스템에서 이 자습서를 실행하는 경우 다음과 같이 명령을 사용하여 샤드 반복자의 획득을 자동화할 수 있습니다.
aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])
get-records
명령을 성공적으로 실행하면 다음 예제와 같이 스트림에서 샤드 반복자를 가져올 때 지정한 샤드에 대한 레코드를 요청합니다.
{
"Records":[ {
"Data":"dGVzdGRhdGE=",
"PartitionKey":"123”,
"ApproximateArrivalTimestamp": 1.441215410867E9,
"SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
} ],
"MillisBehindLatest":24000,
"NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
get-records
는 위에 요청으로 설명되어 있습니다. 즉, 스트림에 레코드가 있는 경우에도 0개 이상의 레코드를 받을 수 있습니다. 반환된 레코드가 스트림에 현재 있는 모든 레코드를 나타내는 것은 아닐 수 있습니다. 이는 정상이며 프로덕션 코드에서는 적절한 간격으로 레코드에 대해 스트림을 폴링합니다. 이 폴링 속도는 특정 애플리케이션 설계 요구 사항에 따라 달라집니다.
자습서 이 부분의 레코드에서는 데이터가 가비지로 보인다는 것을 알 수 있습니다. 이 데이터는 당사가 전송한 일반 텍스트 testdata
가 아닙니다. 이는 바이너리 데이터를 전송할 수 있도록 put-record
가 Base64 방식의 인코딩을 사용하기 때문입니다. 그러나 AWS CLI의 Kinesis Data Streams 지원은 Base64 방식의 디코딩을 제공하지 않습니다. 왜냐하면 stdout에 인쇄된 원시 바이너리 콘텐츠에 대한 Base64 방식의 디코딩은 특정 플랫폼 및 터미널에 대해 불필요한 동작과 잠재적인 보안 문제가 발생할 수 있기 때문입니다. Base64 방식의 디코더(예: https://www.base64decode.org/dGVzdGRhdGE=
를 디코딩하면 실제로 이 데이터가 testdata
임을 알 수 있습니다. 실제로는 데이터를 사용하기 위해 AWS CLI가 거의 사용되지 않기 때문에 이 자습서를 위해서는 충분합니다. 앞에서 살펴본 대로 주로 스트림 상태를 모니터링하고 정보를 얻는 데 사용됩니다(describe-stream
및 list-streams
). KCL에 대한 자세한 내용은 KCL을 사용하여 공유 처리량으로 사용자 지정 소비자 개발을 참조하세요.
경우에 따라 get-records
가 지정된 스트림/샤드에서 모든 레코드를 반환하지 않습니다. 이러한 경우, 마지막 결과에서 NextShardIterator
를 사용하여 다음 레코드 세트를 가져옵니다. 더 많은 데이터를 스트림에 넣은 경우(프로덕션 애플리케이션의 일반적인 상황) 매번 get-records
를 사용하여 데이터에 대한 폴링을 유지할 수 있습니다. 그러나 300초 샤드 반복자 수명 주기 이내에 다음 샤드 반복자를 사용하여 get-records
를 호출하지 않으면 오류 메시지가 발생하며, get-shard-iterator
명령을 사용하여 새로운 샤드 반복자를 가져와야 합니다.
이 출력에는 MillisBehindLatest
도 제공됩니다. 이 값은 스트림의 끝에서 GetRecords 작업의 응답이 나오는 시간(밀리초)이며, 소비자가 있는 현재 시간에서 경과된 시간을 나타냅니다. 값이 0이면 레코드 처리를 따라잡았으며 이 시점에서 처리할 새 레코드가 없음을 나타냅니다. 이 자습서의 경우 시간을 들여 꾸준히 읽을 경우 상당히 큰 숫자가 표시될 수 있습니다. 기본적으로 데이터 레코드는 검색을 위해 대기하도록 24시간 동안 스트림에 유지됩니다. 이 시간을 보존 기간이라고 하며, 최대 365일로 구성할 수 있습니다.
get-records
를 성공적으로 실행하면 현재 스트림에 더 이상 레코드가 없는 경우에도 결과에 항상 NextShardIterator
가 있습니다. 이는 생산자가 특정 시점에서 스트림에 더 많을 레코드를 잠재적으로 넣을 수 있다고 가정하는 폴링 모델입니다. 자체의 폴링 루틴을 작성할 수 있는 경우에도 소비자 애플리케이션을 개발하기 위해 이전에 언급한 KCL을 사용할 경우 이 폴링이 무리없이 수행됩니다.
가져올 스트림과 샤드에 더 이상 레코드가 없을 때까지 get-records
를 호출할 경우 다음 예와 비슷하게 빈 레코드가 포함된 출력이 표시됩니다.
{
"Records": [],
"NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk="
}
4단계: 정리
스트림을 삭제하여 리소스를 비우고 계정에 의도하지 않은 요금 청구를 방지합니다. 스트림을 사용하여 데이터를 넣고 가져오든 사용하지 않든 관계없이 스트림당 요금이 누적되므로 스트림을 생성하고 사용하지 않는 경우 언제든지 이를 수행합니다. 정리 명령은 다음과 같습니다.
aws kinesis delete-stream --stream-name Foo
성공적으로 실행되면 출력이 없습니다. describe-stream
을 사용하여 삭제 진행 상황을 확인합니다.
aws kinesis describe-stream-summary --stream-name Foo
삭제 명령 직후 이 명령을 실행하면 다음 예와 비슷한 출력이 표시됩니다.
{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",
스트림이 완전히 삭제된 후 describe-stream
을 실행하면 "찾을 수 없음" 오류가 발생합니다.
A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation:
Stream Foo under account 123456789012 not found.