자습서: 를 사용하여 기본 Kinesis Data Streams 작업 수행 AWS CLI - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

자습서: 를 사용하여 기본 Kinesis Data Streams 작업 수행 AWS CLI

이 섹션에서는 AWS CLI를 사용하여 명령줄에서 Kinesis 데이터 스트림을 사용하는 기본 방법을 설명합니다. Amazon Kinesis Data Streams 용어 및 개념에 설명된 개념을 숙지하십시오.

참고

스트림을 생성한 후에는 Kinesis Data Streams가 프리 티어에 적합하지 않기 때문에 계정에 Kinesis Data Streams 사용에 대한 소액의 요금이 부과됩니다. AWS 이 자습서를 모두 마치면 AWS 리소스를 삭제하여 요금이 발생하지 않도록 하십시오. 자세한 내용은 4단계: 정리 단원을 참조하십시오.

1단계: 스트림 생성

첫 번째 단계는 스트림을 생성하고 성공적으로 생성되었는지 확인하는 것입니다. 다음 명령을 사용하여 이름이 "Foo"인 스트림을 생성합니다.

aws kinesis create-stream --stream-name Foo

그런 다음, 다음 명령을 사용하여 스트림의 생성 진행 상황을 확인합니다.

aws kinesis describe-stream-summary --stream-name Foo

다음 예와 비슷한 출력 결과를 얻어야 합니다.

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

이 예시에서는 스트림에 상태가 있는데CREATING, 이는 아직 사용할 준비가 되지 않았음을 의미합니다. 잠시 후 다시 확인하면 다음 예와 비슷한 출력 결과가 표시되어야 합니다.

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

이 출력에는 이 자습서에는 필요하지 않은 정보가 있습니다. 현재 중요한 정보는 스트림을 사용할 준비가 되었음을 알려주는 정보와 요청한 단일 샤드에 대한 정보입니다. "StreamStatus": "ACTIVE" 다음과 같이 list-streams 명령을 사용하여 새 스트림의 존재 여부를 확인할 수도 있습니다.

aws kinesis list-streams

출력:

{ "StreamNames": [ "Foo" ] }

2단계: 기록 남기기

이제 스트림이 활성 상태이므로 일부 데이터를 입력할 준비가 되었습니다. 이 자습서의 경우 가능한 가장 간단한 명령인 put-record를 사용합니다. 이 명령은 "testdata" 텍스트를 포함하는 단일 데이터 레코드를 스트림에 넣습니다.

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

이 명령이 제대로 실행되면 다음 예와 비슷한 출력 결과가 발생합니다.

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

축하합니다. 스트림에 데이터를 추가했습니다. 그런 다음 스트림에서 데이터를 가져오는 방법이 표시됩니다.

3단계: 기록 가져오기

GetShardIterator

스트림에서 데이터를 가져오려면 먼저 관심 있는 샤드의 샤드 이터레이터를 구해야 합니다. 샤드 반복자는 소비자(이 경우 get-record 명령)가 읽을 샤드와 스트림의 위치를 나타냅니다. get-shard-iterator명령어는 다음과 같이 사용하게 됩니다.

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

aws kinesis명령 뒤에는 Kinesis Data API Streams가 포함되어 있으므로 표시된 파라미터에 대해 궁금한 점이 있으면 참조 항목에서 GetShardIteratorAPI해당 매개 변수에 대해 읽어볼 수 있습니다. 성공적으로 실행하면 다음 예제와 비슷한 결과가 출력됩니다.

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

외견상으로 무작위처럼 보이는 문자의 긴 문자열이 샤드 반복자입니다(사용자의 반복자는 다름). 샤드 이터레이터를 다음 그림과 같이 get 명령에 복사하여 붙여넣어야 합니다. 샤드 반복자에는 300초의 유효한 수명 주기가 있습니다. 이 수명 주기는 샤드 반복자를 복사하여 다음 명령에 붙여 넣는데 충분한 시간이어야 합니다. 다음 명령어로 붙여넣기 전에 샤드 이터레이터에서 줄 바꿈을 모두 제거해야 합니다. 샤드 이터레이터가 더 이상 유효하지 않다는 오류 메시지가 표시되면 명령을 다시 실행하십시오. get-shard-iterator

GetRecords

get-records 명령은 스트림에서 데이터를 가져오고 Kinesis Data Streams의 호출로 GetRecords확인됩니다. API 샤드 반복기는 샤드에서 순차적으로 데이터 레코드 읽기를 시작할 위치를 지정합니다. 반복기가 가리키는 샤드 부분에 사용 가능한 레코드가 없는 경우 GetRecords는 빈 목록을 반환합니다. 레코드가 포함된 샤드의 일부에 도달하려면 여러 번의 호출이 필요할 수 있습니다.

다음 get-records 명령 예제에서:

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

bash와 같은 Unix 유형의 명령 프로세서에서 이 자습서를 실행하는 경우 다음과 같이 중첩된 명령을 사용하여 샤드 이터레이터 획득을 자동화할 수 있습니다.

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

를 지원하는 PowerShell 시스템에서 이 자습서를 실행하는 경우 다음과 같은 명령을 사용하여 샤드 이터레이터 획득을 자동화할 수 있습니다.

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

get-records명령이 성공적으로 실행되면 다음 예와 같이 샤드 이터레이터를 가져올 때 지정한 샤드에 대한 레코드가 스트림에서 요청됩니다.

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

참고로get-records, 위에서 설명한 내용은 요청이므로 스트림에 레코드가 있더라도 0개 이상의 레코드를 받을 수 있습니다. 반환된 레코드가 현재 스트림에 있는 모든 레코드와 일치하지 않을 수 있습니다. 이는 정상적인 현상이며 프로덕션 코드는 적절한 간격으로 스트림을 폴링하여 레코드를 찾습니다. 폴링 속도는 특정 애플리케이션 설계 요구 사항에 따라 달라집니다.

이 자습서의 이 부분을 기록해 보면 데이터가 쓰레기로 보이지만 testdata 우리가 보낸 일반 텍스트가 아님을 알 수 있습니다. 이는 바이너리 데이터를 전송할 수 있도록 put-record가 Base64 방식의 인코딩을 사용하기 때문입니다. 그러나 stdout에 인쇄된 원시 바이너리 콘텐츠로 Base64 AWS CLI 디코딩하면 특정 플랫폼 및 터미널에서 원하지 않는 동작이 발생하고 잠재적인 보안 문제가 발생할 수 있으므로 에서 Kinesis Data Streams를 지원하는 경우 Base64 디코딩이 제공되지 않습니다. Base64 방식의 디코더(예: https://www.base64decode.org/)를 사용하여 수동으로 dGVzdGRhdGE=를 디코딩하면 실제로 이 데이터가 testdata임을 알 수 있습니다. 실제로는 데이터를 소비하는 데 거의 사용되지 않기 때문에 이 자습서에서는 이 정도면 충분합니다. AWS CLI 이전 (describe-streamlist-streams) 에서 볼 수 있듯이 스트림의 상태를 모니터링하고 정보를 얻는 데 사용되는 경우가 더 많습니다. 에 대한 자세한 내용은 공유 처리량을 사용하여 사용자 지정 소비자 개발을 참조하십시오KCL. KCL

get-records지정된 스트림/샤드의 모든 레코드를 항상 반환하지는 않습니다. 이러한 경우, 마지막 결과에서 NextShardIterator를 사용하여 다음 레코드 세트를 가져옵니다. 스트림에 더 많은 데이터가 입력되는 경우 (프로덕션 애플리케이션의 일반적인 상황), 매번 사용하여 데이터를 계속 폴링할 수 있습니다. get-records 하지만 300초의 샤드 이터레이터 수명 내에 다음 샤드 이터레이터를 get-records 사용하여 호출하지 않으면 오류 메시지가 표시되므로 새 샤드 이터레이터를 가져오려면 get-shard-iterator 명령을 사용해야 합니다.

또한 이 출력에는 스트림의 끝에서 GetRecords작업 응답이 MillisBehindLatest 나온 시간 (밀리초) 도 제공되어 소비자가 현재 시간보다 얼마나 늦었는지 알 수 있습니다. 값이 0이면 레코드 처리를 따라잡았으며 이 시점에서 처리할 새 레코드가 없음을 나타냅니다. 이 자습서의 경우 시간을 들여 꾸준히 읽을 경우 상당히 큰 숫자가 표시될 수 있습니다. 기본적으로 데이터 레코드는 검색될 때까지 24시간 동안 스트림에 보관됩니다. 이 시간을 보존 기간이라고 하며, 최대 365일로 구성할 수 있습니다.

현재 스트림에 레코드가 더 이상 NextShardIterator 없더라도 항상 성공적인 get-records 결과를 얻을 수 있습니다. 이는 생산자가 특정 시점에서 스트림에 더 많을 레코드를 잠재적으로 넣을 수 있다고 가정하는 폴링 모델입니다. 폴링 루틴을 직접 작성할 수도 있지만 앞서 언급한 KCL 방법을 소비자 애플리케이션 개발에 사용하면 이 폴링이 자동으로 해결됩니다.

가져오려는 스트림과 샤드에 더 이상 레코드가 없을 get-records 때까지 호출하면 다음 예와 마찬가지로 빈 레코드가 포함된 출력이 표시됩니다.

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

4단계: 정리

스트림을 삭제하여 리소스를 확보하고 계정에 의도치 않은 요금이 청구되는 것을 방지하세요. 스트림을 만들었다가 사용하지 않을 때는 언제든지 이렇게 하세요. 데이터를 업로드하고 가져오든 안 하든 스트림당 요금이 누적되기 때문입니다. 클린업 명령은 다음과 같습니다.

aws kinesis delete-stream --stream-name Foo

성공하면 출력이 없습니다. 삭제 진행 상황을 확인하는 describe-stream 데 사용합니다.

aws kinesis describe-stream-summary --stream-name Foo

삭제 명령 직후에 이 명령을 실행하면 다음 예와 비슷한 출력이 표시됩니다.

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

스트림이 완전히 삭제된 후 describe-stream을 실행하면 "찾을 수 없음" 오류가 발생합니다.

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.