翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
チュートリアル: AWS CLI を使用した基本的な Kinesis Data Streams の操作を実行する
このセクションでは、AWS CLI を使用した、コマンドラインからの Kinesis data stream の基本的な使用方法について説明します。Amazon Kinesis Data Streams の用語と概念で説明されている概念を理解している必要があります。
注記
Kinesis Data Streams は AWS の無料利用枠の対象外であるため、ストリームの作成後は、Kinesis Data Streams の使用に対してアカウントに少額の料金が発生します。このチュートリアルを終了したら、AWS リソースを削除して料金が発生しないようにしてください。詳細については、ステップ 4: クリーンアップするを参照してください。
ステップ 1: ストリームを作成する
最初のステップは、ストリームを作成し、正常に作成されたことを確認することです。次のコマンドを使用して、Fooという名前のストリームを作成します。
aws kinesis create-stream --stream-name Foo
次に、次のコマンドを実行して、ストリーム作成の進行状況を確認します。
aws kinesis describe-stream-summary --stream-name Foo
次の例のような出力が得られます。
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
この例では、ストリームのステータスは CREATING であり、使用する準備がまだ整っていないことを意味します。しばらくしてからもう一度調べると、次の例のような出力が表示されます。
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
この出力には、このチュートリアルで必要がない情報も含まれています。ここで重要な項目は "StreamStatus": "ACTIVE"
であり、ストリームを使用する準備ができたことと、リクエストした単一のシャードに関する情報が示されています。また、次に示すように、list-streams
コマンドを使用して新しいストリームの存在を確認することもできます。
aws kinesis list-streams
出力:
{
"StreamNames": [
"Foo"
]
}
ステップ 2: レコードを入力する
アクティブなストリームができたら、データを入力できます。このチュートリアルでは、最もシンプルなコマンド put-record
を使用して、"testdata" というテキストを含む単一のデータレコードをストリームに入力します。
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
このコマンドが成功すると、出力は次の例のようになります。
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
}
これで、ストリームにデータを追加できました。次にストリームからデータを取得する方法を説明します。
ステップ 3: レコードを取得する
GetShardIterator
ストリームからデータを取得するには、対象となるシャードのシャードイテレーターを取得する必要があります。シャードイテレーターは、コンシューマー (ここでは get-record
コマンド) が読み取るストリームとシャードの位置を表します。次のように get-shard-iterator
コマンドを使用します。
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
aws kinesis
コマンドの背後には Kinesis Data Streams API があります。示されているパラメータに関心がある場合は、 GetShardIterator
API のリファレンスのトピックを参照してください。実行が成功すると、出力は次の例のようになります。
{
"ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg="
}
ランダムに見える長い文字列がシャードイテレーターです (お客様のシャードイテレーターはこれとは異なります)。このシャードイテレーターをコピーして、次に示す get コマンドに貼り付ける必要があります。シャードイテレーターの有効期間は 300 秒です。これは、シャードイテレーターをコピーして次のコマンドに貼り付けるのに十分な時間です。次のコマンドに貼り付ける前に、シャードイテレーターから改行を削除する必要があります。シャードイテレーターが有効ではないことを示すエラーメッセージが表示された場合は、もう一度 get-shard-iterator
コマンドを実行します。
GetRecords
get-records
コマンドはストリームからデータを取得し、Kinesis Data Streams API での GetRecords
の呼び出しに解決します。シャードイテレーターは、データレコードの逐次読み取りを開始する、シャード内の位置を指定します。イテレーターが指定するシャードの位置にレコードがない場合、GetRecords
は空のリストを返します。シャード内のレコードが含まれる位置に到達するために、複数の呼び出しが必要になる場合があります。
次の get-records
コマンドの例で以下の操作を行います。
aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=
bash など Unix タイプのコマンドプロセッサからこのチュートリアルを実行する場合は、次のように入れ子にしたコマンドを使用して、シャードイテレーターの取得を自動化できます。
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR
PowerShell をサポートするシステムからこのチュートリアルを実行する場合、次のようなコマンドを使用してシャードのイテレータの取得を自動化できます。
aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])
get-records
コマンドが正常に終了すると、次の例のように、シャードイテレーターを取得するときに指定したシャードに対応するストリーム内のレコードがリクエストされます。
{
"Records":[ {
"Data":"dGVzdGRhdGE=",
"PartitionKey":"123”,
"ApproximateArrivalTimestamp": 1.441215410867E9,
"SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
} ],
"MillisBehindLatest":24000,
"NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
上記で get-records
をリクエストとして説明しましたが、これは、ストリーム内にレコードが存在する場合でもゼロ件以上のレコードが返される可能性があることを意味します。返されるレコードは、ストリーム内のすべてのレコードを現在表すとは限りません。これは正常であり、本番コードは適切な間隔でストリームをポーリングしてレコードがないか調べます。このポーリング速度は、特定のアプリケーション設計要件によって異なります。
チュートリアルのこのパートのレコードでは、データがゴミのように見えることがわかります。これは私たちが送ったクリアテキスト testdata
ではありません。これは、バイナリデータを送信できるように、put-record
では Base64 エンコーディングを使用しているためです。ただし、AWS CLI での Kinesis Data Streams のサポートでは、Base64 デコーディングを提供していません。これは、Base64 デコーディングされた raw バイナリコンテンツを stdout に出力すると、特定のプラットフォームやターミナルで、意図しない動作やセキュリティ上の問題が発生する可能性があるためです。Base64 デコーダ (https://www.base64decode.org/dGVzdGRhdGE=
をデコードすると、これが実際に testdata
であることを確認できます。これはこのチュートリアルのために十分です。実際には、AWS CLI がデータの消費に使用されることはまれです。多くの場合、前述のように (describe-stream
と list-streams
)、ストリームの状態をモニタリングし、情報を取得するために使用します。KCL の詳細については、KCL を使用したスループット共有カスタムコンシューマーの開発を参照してください。
get-records
によって、常に指定されたストリーム/シャード内のすべてのレコードが返されるわけではありません。このような場合は、最後の結果から NextShardIterator
を使用して、次のレコードのセットを取得します。したがって、大量のデータがストリームに入力されていた場合 これは本稼働アプリケーションでの通常の状況ですが、毎回 get-records
を使用してデータのポーリングを継続できます。ただし、300 秒のシャードイテレーターの有効期間内に次のシャードイテレーターを使用して get-records
を呼び出した場合、エラーメッセージが表示され、get-shard-iterator
コマンドを使用して最新のシャードイテレーターを取得する必要があります。
この出力には、MillisBehindLatest
も含まれています。これは、ストリームの末尾から GetRecords オペレーションのレスポンスまでの時間 (ミリ秒) であり、コンシューマーの時間の現在の時刻からの遅れを示します。値ゼロはレコード処理が追いついて、現在処理する新しいレコードは存在しないことを示します。このチュートリアルの場合は、作業を進めるのに時間をかけていると、この数値がかなり大きくなる可能性があります。デフォルトでは、データレコードはストリームに 24 時間留まり、取得されるのを待ちます。この期間は保持期間と呼ばれ、365 日 まで設定可能です。
get-records
が成功したときの結果は、現在ストリームにこれ以上レコードが見つからない場合でも常に NextShardIterator
です。これは、プロデューサーがどの時点でもストリームにレコードを入力している可能性があることを前提としたポーリングモデルです。独自のポーリングルーチンを記述することもできますが、開発中のコンシューマーアプリケーションで、前に説明した KCL を使用している場合、このポーリングによって処理が実行されます。
レコードをプルする対象のストリームまたはシャードにそれ以上レコードがなくなるまで get-records
を呼び出すと、次の例のような空のレコードの出力が表示されます。
{
"Records": [],
"NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk="
}
ステップ 4: クリーンアップする
ストリームを削除してリソースを解放し、アカウントに対する意図しない料金が発生することを回避できます。ストリームを作成したが、使用する予定がない場合は、必ずこれを行ってください。ストリームでデータを入力および取得したかどうかにかかわらず、ストリームごとに料金が発生するためです。clean-up コマンドは次のとおりです。
aws kinesis delete-stream --stream-name Foo
成功すると、出力は発生しません。削除の進行状況を確認するには、describe-stream
を使用します。
aws kinesis describe-stream-summary --stream-name Foo
delete コマンドの直後にこのコマンドを実行する場合、次の例のような出力が表示されます。
{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",
ストリームが完全に削除されると、describe-stream
はnot foundエラーを返します。
A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation:
Stream Foo under account 123456789012 not found.