教學課程:使用 AWS CLI - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程:使用 AWS CLI

本節說明如何使用 AWS CLI透過命令列對 Kinesis 資料串流執行基本操作。請您務必先熟悉 Amazon Kinesis Data Streams 術語和概念 所討論的概念。

注意

建立串流後,您的帳戶會產生 Kinesis Data Streams 使用的象徵性費用,因為 Kinesis Data Streams 不符合免費方案的 AWS 資格。完成本教學課程後,請刪除資 AWS 源以停止產生費用。如需詳細資訊,請參閱 步驟 4:清理

步驟 1:建立串流

您的第一個步驟是建立串流並確認其是否已成功建立。使用以下命令建立名為 "Foo" 的串流:

aws kinesis create-stream --stream-name Foo

接著,發出以下命令檢查串流的建立進度:

aws kinesis describe-stream-summary --stream-name Foo

您應會看到類似如下範例的輸出:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

在此示例中,流具有狀態CREATING,這意味著它尚未準備好使用。請於幾分鐘後再次檢查,屆時您應會看到類似如下範例的輸出:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

此輸出中包含本教學課程不需要的資訊。目前的重要資訊是"StreamStatus": "ACTIVE",它會告訴您串流已準備好可供使用,以及您要求的單一碎片上的資訊。您也可以使用 list-streams 命令確認新串流是否存在,如下所示:

aws kinesis list-streams

輸出:

{ "StreamNames": [ "Foo" ] }

步驟 2:輸入記錄

現在您已有了作用中的串流,即可準備開始放入一些資料。本教學課程使用最簡單可行的 put-record 命令,將包含文字 "testdata" 的單一資料記錄放入串流:

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

此命令若成功,將產生類似如下範例的輸出:

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

恭喜,您剛已順利加入資料至串流!接下來您將了解如何從串流取出資料。

步驟 3:獲取記錄

GetShardIterator

在從流中獲取數據之前,必須獲取您感興趣的碎片的碎片迭代器。碎片疊代運算代表了消費者 (本例中為 get-record 命令) 將從中讀取資料的串流及碎片的位置。您將使用get-shard-iterator命令,如下所示:

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

回想一下,這些aws kinesis命令API後面有一個 Kinesis Data Streams,因此,如果您對顯示的任何參數感到好奇,可以在參GetShardIteratorAPI考主題中閱讀它們。成功執行會產生類似下列範例的輸出:

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

一長串看似隨機字元的字串就是碎片疊代運算 (您的字串會有所出入)。您必須將碎片迭代器複製/粘貼到 get 命令中,如下所示。碎片疊代運算的有效期為 300 秒,這段時間應足夠讓您將碎片疊代運算複製/貼入下一個命令中。在粘貼到下一個命令之前,必須從碎片迭代器中刪除任何換行符。如果出現碎片迭代器不再有效的錯誤消息,請再次運行該get-shard-iterator命令。

GetRecords

get-records令會從串流取得資料,並解析為 Kinesis Data Streams GetRecordsAPI中的呼叫。碎片迭代運算指定了碎片中您希望開始循序讀取資料記錄的位置。如果疊代運算所指向的碎片部分沒有可用的記錄,GetRecords 將傳回空白清單。它可能需要多次調用才能到達包含記錄的碎片的一部分。

在下列get-records命令範例中:

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

如果您是從 Unix 類型的命令處理器(例如 bash)運行本教程,則可以使用嵌套命令自動獲取碎片迭代器,如下所示:

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

如果您是從支援的系統執行本教學課程 PowerShell,您可以使用如下指令自動擷取碎片迭代器:

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

get-records命令的成功結果將會針對您取得碎片迭代器時指定的碎片,要求串流中的記錄,如下列範例所示:

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

請注意,上面描述get-records請求,這意味著即使流中有記錄,您也可能會收到零個或多個記錄。傳回的任何記錄可能無法代表串流中目前的所有記錄。這是正常的,生產代碼將在適當的時間間隔輪詢流中的記錄。此輪詢速度會根據您的特定應用程式設計需求而有所不同。

在本教程的這一部分的記錄中,您會注意到數據似乎是垃圾-這不是testdata我們發送的明文。這是因為 put-record 採用 Base64 編碼方式,讓您能夠傳送二進位資料。但是,中的 Kinesis Data Streams 支援 AWS CLI 不提供 Base64 解碼,因為 Base64 解碼為列印至標準輸出的原始二進位內容可能會導致特定平台和終端機上出現不想要的行為和潛在的安全性問題。若您使用 Base64 解碼器 (如 https://www.base64decode.org/) 手動解碼 dGVzdGRhdGE=,就會看到實際原文是 testdata。這對於本教程來說就足夠了,因為在實踐中, AWS CLI 很少用於消耗數據。更常見的是,它用於監視流的狀態並獲取信息,如前所示(describe-streamlist-streams)。如需有關的詳細資訊KCL,請參閱使用開發具有共用輸送量的自訂取用者KCL

get-records並不總是返回指定的流/碎片中的所有記錄。若發生這種情況,請由最近的結果使用 NextShardIterator 以取得下一組記錄。如果將更多資料放入串流中 (這是生產應用程式中的正常情況),您可以get-records每次都使用輪詢資料。但是,如果您未在 300 秒碎片迭代器生命週期內get-records使用下一個碎片迭代器進行調用,則會收到錯誤消息,並且必須使用該get-shard-iterator命令獲取新的碎片迭代器。

在此輸出還提供的是MillisBehindLatest,這是毫秒數GetRecords操作的響應是從流的尖端,指示多遠落後於當前時間消費者是。值為零表示記錄處理已跟上進度,此時沒有任何新記錄可供處理。在本教學課程中,若您一邊閱讀內容一邊操作,可能會看到這個數值相當大。根據預設,資料記錄會保留在串流中 24 小時,等待您擷取它們。此時間範圍稱為保留期間,最長可設定成 365 天。

即使流中當前沒有更多記錄,成功的get-records結果NextShardIterator也將始終具有。這是假定生產者可能在任何特定時間內將更多記錄放入串流的一種輪詢模式。雖然您可以編寫自己的輪詢常式,但是如果您使用前面提到KCL的開發消費者應用程式,則會為您處理此輪詢。

如果您呼叫get-records直到串流中沒有更多記錄,而您要從中提取的碎片時,您會看到類似下列範例的空白記錄的輸出:

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

步驟 4:清理

刪除串流以釋放資源,避免帳戶產生意外費用。在您建立串流但不會使用串流的任何時候都要這麼做,因為無論您是否要放置和取得資料,每個串流都會產生費用。清理命令如下:

aws kinesis delete-stream --stream-name Foo

成功導致沒有輸出。用describe-stream於檢查刪除進度:

aws kinesis describe-stream-summary --stream-name Foo

如果刪除命令後立即執行此命令,你會看到類似下面的例子輸出:

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

串流完全刪除後,describe-stream 將導致「找不到」的錯誤。

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.