

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 AWS CLI 来执行 Amazon Kinesis Data Streams 操作
<a name="getting-started"></a>

本节向您展示如何使用 AWS Command Line Interface执行基本的 Amazon Kinesis Data Streams 操作。您将了解 Kinesis Data Streams 基本的数据流原则，以及在 Kinesis 数据流中存入和取用数据的必要步骤。

 如果是首次使用 Kinesis Data Streams，请先熟悉 [Amazon Kinesis Data Streams 术语和概念](key-concepts.md) 中介绍的概念和术语。

**Topics**
+ [教程：为 Kinesis Data AWS CLI Streams 安装和配置](kinesis-tutorial-cli-installation.md)
+ [教程：使用 Kinesis Data Streams 执行基本操作 AWS CLI](fundamental-stream.md)

要进行 CLI 访问，您需要访问密钥 ID 和秘密访问密钥。如果可能，请使用临时凭证代替长期访问密钥。临时凭证包括访问密钥 ID、秘密访问密钥，以及一个指示凭证何时到期的安全令牌。有关更多信息，请参阅 *IAM 用户指南*中的将[临时证书与 AWS 资源配合使用](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html)。

您可以在[创建 step-by-step IAM 用户中找到详细的 IAM](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user) 和安全密钥设置说明。

在本节中，讨论到的特定命令将一字不差地提供，但其特定值在每次运行时必然会不同。此外，示例使用的是美国西部（俄勒冈州）区域，但本节中的步骤在任何[支持 Kinesis Data Streams 的区域](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region)中都有效。

# 教程：为 Kinesis Data AWS CLI Streams 安装和配置
<a name="kinesis-tutorial-cli-installation"></a>

## 安装 AWS CLI
<a name="install-cli"></a>

有关如何安装适用于 Windows 以及 AWS CLI 适用于 Linux、OS X 和 Unix 操作系统的详细步骤，请参阅[安装 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html)。

 使用以下命令列出可用的选项和服务：

```
aws help
```

您将使用 Kinesis Data Streams 服务，因此您可以使用以下命令查看 AWS CLI 与 Kinesis Data Streams 相关的子命令：

```
aws kinesis help
```

此命令将生成包含可用 Kinesis Data Streams 命令的输出：

```
AVAILABLE COMMANDS

       o add-tags-to-stream

       o create-stream

       o delete-stream

       o describe-stream

       o get-records

       o get-shard-iterator

       o help

       o list-streams

       o list-tags-for-stream

       o merge-shards

       o put-record

       o put-records

       o remove-tags-from-stream

       o split-shard

       o wait
```

 此命令列表对应着 [Amazon Kinesis Service API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/) 中的 Kinesis Data Streams API。例如，`create-stream` 命令与 `CreateStream` API 操作对应。

 AWS CLI 现在已成功安装，但尚未配置。这将在下一节展示。

## 配置 AWS CLI
<a name="config-cli"></a>

 对于一般用途，该`aws configure`命令是设置 AWS CLI 安装的最快方法。有关更多信息，请参阅[配置 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html)。

# 教程：使用 Kinesis Data Streams 执行基本操作 AWS CLI
<a name="fundamental-stream"></a>

本节介绍如何通过 AWS CLI从命令行对 Kinesis 数据流执行基本操作。确保您熟悉[Amazon Kinesis Data Streams 术语和概念](key-concepts.md)中讨论的概念。

**注意**  
创建直播后，您的账户会因使用 Kinesis Data Streams 而产生象征性的费用，因为 Kinesis Data Streams 不符合 AWS 免费套餐的资格。完成本教程后，请删除您的 AWS 资源以停止产生费用。有关更多信息，请参阅 [步骤 4：清除](#clean-up)。

**Topics**
+ [第 1 步：创建流](#create-stream)
+ [步骤 2：放置记录](#put-record)
+ [步骤 3：获取记录](#get-records)
+ [步骤 4：清除](#clean-up)

## 第 1 步：创建流
<a name="create-stream"></a>

 您的第一步是创建一个流并验证它是否已创建成功。使用以下命令创建一个名为“Foo”的流：

```
aws kinesis create-stream --stream-name Foo
```

接下来，发出以下命令以检查流的创建进度：

```
aws kinesis describe-stream-summary --stream-name Foo
```

您应获得类似于以下示例的输出：

```
{
    "StreamDescriptionSummary": {
        "StreamName": "Foo",
        "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo",
        "StreamStatus": "CREATING",
        "RetentionPeriodHours": 48,
        "StreamCreationTimestamp": 1572297168.0,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "OpenShardCount": 3,
        "ConsumerCount": 0
    }
}
```

 在此示例中，流的状态为 CREATING，这表示它还未完全做好使用准备。在几分钟后再次检查，您应看到类似于以下示例的输出：

```
{
    "StreamDescriptionSummary": {
        "StreamName": "Foo",
        "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 48,
        "StreamCreationTimestamp": 1572297168.0,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "OpenShardCount": 3,
        "ConsumerCount": 0
    }
}
```

此输出包含您在本教程中无需关注的信息。目前，您需要重点关注的是 `"StreamStatus": "ACTIVE"`（告知您流已做好使用准备）和有关您请求的单个分片的信息。您还可以通过使用 `list-streams` 命令验证您的新流是否存在，如下所示：

```
aws kinesis list-streams
```

输出：

```
{
    "StreamNames": [
        "Foo"
    ]
}
```

## 步骤 2：放置记录
<a name="put-record"></a>

 既然您已经拥有活动的流，您便已做好放置一些数据的准备。在本教程中，您将使用最简单的命令 `put-record`，该命令会将一个包含文本“testdata”的数据记录放入流中：

```
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
```

 如果成功，此命令将生成类似于以下示例的输出：

```
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
}
```

恭喜，您刚刚已将数据添加到流！接下来您将了解如何从流中获取数据。

## 步骤 3：获取记录
<a name="get-records"></a>

**GetShardIterator**

 您必须先为您感兴趣的分片获取分片迭代器，然后才能从流中获取数据。分片迭代器表示消费端（在本例中为 `get-record` 命令）要从中读取数据的流和分片的位置。您将使用 `get-shard-iterator` 命令，如下所示：

```
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
```

请记住，`aws kinesis` 命令后面有一个 Kinesis Data Streams API，因此如果您对显示的任何参数感兴趣，都可以在 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html) API 参考主题中加以了解。成功执行后，此命令将生成类似于以下示例的输出：

```
{
    "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg="
}
```

看起来像随机字符的长字符串就是分片迭代器（您的字符串将与此不同）。你必须 copy/paste 将分片迭代器放入 get 命令，如下所示。分片迭代器的有效生命周期为 300 秒，这应该足以让分片迭代器进入下一个命令。 copy/paste 在将分片迭代器粘贴到写一个命令之前，您必须从中删除所有换行符。如果您收到分片迭代器不再有效的错误消息，请再次运行 `get-shard-iterator` 命令。

**GetRecords**

`get-records` 命令从流中获取数据，然后解析为对 Kinesis Data Streams API 中的 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) 的调用。分片迭代器指定了分片中的一个位置，您希望从该位置开始按顺序读取数据记录。如果迭代器指向的分片中的部分没有可用的记录，`GetRecords` 将返回空白列表。可能需要进行多次调用才能到达分片中包含记录的部分。

在 `get-records` 命令的以下示例中：

```
aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=
```

 如果您是从 Unix 类型的命令处理器（如 bash）运行本教程，则可以使用嵌套命令自动执行分片迭代器的获取，如下所示：

```
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator')

aws kinesis get-records --shard-iterator $SHARD_ITERATOR
```

如果您在支持的系统上运行本教程 PowerShell，则可以使用如下命令自动获取分片迭代器：

```
aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])
```

`get-records` 命令的成功结果将从您在获取分片迭代器时指定的分片的流中请求记录，如以下示例所示：

```
{
  "Records":[ {
    "Data":"dGVzdGRhdGE=",
    "PartitionKey":"123”,
    "ApproximateArrivalTimestamp": 1.441215410867E9,
    "SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
  } ],
  "MillisBehindLatest":24000,
  "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
```

请注意，`get-records` 在上面被描述为*请求*，这意味着即使您的流中有记录，您可能也会收到零个或零个以上的记录。返回的任何记录都不能表示流中当前的所有记录。这是正常的，生产代码将以适当的时间间隔轮询流中的记录。此轮询速度将因您的特定应用程序设计要求而异。

在本教程的这部分中，您会注意到记录中的数据似乎是垃圾，不是我们发送的明文 `testdata`。这归因于 `put-record` 使用 Base64 编码支持您发送二进制数据的方式。但是，中的 AWS CLI Kinesis Data Streams支持不提供 *Base64* 解码，因为Base64解码到打印到标准输出的原始二进制内容可能会导致某些平台和终端出现不良行为和潜在的安全问题。如果您使用 Base64 解码程序（例如，[https://www.base64decode.org/](https://www.base64decode.org/)）对 `dGVzdGRhdGE=` 进行手动解码，您将看到它实际上是 `testdata`。就本教程而言，这已经足够了，因为在实践中， AWS CLI 很少使用来消耗数据。更多时候，它用于监控流的状态并获取信息，如上方所示（`describe-stream` 和 `list-streams`）。有关 KCL 的更多信息，请参阅[使用 KCL 开发具有共享吞吐量的自定义消费端](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)。

`get-records` 并非总是会返回在流/分片中指定的所有记录。当出现这种情况时，请使用最后一个结果中的 `NextShardIterator` 获取下一组记录。如果更多数据会被放入流中（这是生产应用程序中的一般情况），您每次都可以使用 `get-records` 持续轮询数据。但是，如果您在 300 秒的分片迭代器生命周期内未使用下一个分片迭代器调用 `get-records`，则会收到一条错误消息，并且必须使用 `get-shard-iterator` 命令来获取新的分片迭代器。

此输出中还提供了 `MillisBehindLatest`，它是从流的末端响应 [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) 操作的毫秒数，指示使用者落后当前时间多远。零值指示正进行记录处理，此时没有新的记录要处理。在本教程中，如果您一边阅读教程一边操作，则可能会看到这个数值非常大。数据记录默认会在流中保留 24 小时，待您检索。此时间范围称为保留期，可以配置为最多 365 天。

成功执行 `get-records` 的结果总会返回一个 `NextShardIterator`，即使目前流中没有更多记录。这是一个假定创建器在任何给定时间内正在将更多记录放入流中的轮询模型。虽然您可编写自己的轮询例程，但如果您使用之前提到的 KCL 开发消费端应用程序，则系统将会为您执行此轮询。

如果您调用 `get-records`，直到您正在提取的流和分片中没有更多记录，您将看到带有空白记录的输出，类似于以下示例：

```
{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk="
}
```

## 步骤 4：清除
<a name="clean-up"></a>

请删除流以释放资源并避免账户产生意外费用。每当您创建了不会使用的流时，请执行此操作，因为费用是按流量计算的，无论您是否使用流放置和获取数据，都会产生费用。清除命令如下所示：

```
aws kinesis delete-stream --stream-name Foo
```

 成功运行命令会导致没有输出。使用 `describe-stream` 检查删除进度：

```
aws kinesis describe-stream-summary --stream-name Foo
```

 如果您在执行删除命令后立即执行此命令，您会看到类似于以下示例的输出：

```
{
    "StreamDescriptionSummary": {
        "StreamName": "samplestream",
        "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream",
        "StreamStatus": "ACTIVE",
```

 在流完全删除后，`describe-stream` 将生成“未找到”错误：

```
A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: 
Stream Foo under account 123456789012 not found.
```