教程:创建 REST API 作为 Amazon Kinesis 代理 - Amazon API Gateway

教程:创建 REST API 作为 Amazon Kinesis 代理

此页面介绍如何利用 AWS 类型的集成创建和配置 REST API 以访问 Kinesis。

注意

要将 API Gateway API 与 Kinesis 集成,您必须选择同时提供 API Gateway 和 Kinesis 服务的区域。有关区域可用性,请参阅服务端点和配额

为了进行说明,我们创建一个示例 API,以使客户端能够执行以下操作:

  1. 列出 Kinesis 中用户的可用流

  2. 创建、描述或删除指定流

  3. 从指定流读取数据记录或将数据记录写入指定流

为了完成上述任务,API 分别在各种资源上使用了多种方法来调用以下内容:

  1. Kinesis 中的 ListStreams 操作

  2. CreateStreamDescribeStreamDeleteStream 操作

  3. Kinesis 中的 GetRecordsPutRecords(包括 PutRecord)操作

具体来说,我们按如下所示构建 API:

  • 在 API 的 /streams 资源上公开 HTTP GET 方法,并将此方法与 Kinesis 中的 ListStreams 操作集成,以列出调用方账户中的流。

  • 在 API 的 /streams/{stream-name} 资源上公开 HTTP POST 方法,并将此方法与 Kinesis 中的 CreateStream 操作集成,以在调用方账户中创建指定流。

  • 在 API 的 /streams/{stream-name} 资源上公开 HTTP GET 方法,并将此方法与 Kinesis 中的 DescribeStream 操作集成,以描述调用方账户中的指定流。

  • 在 API 的 /streams/{stream-name} 资源上公开 HTTP DELETE 方法,并将此方法与 Kinesis 中的 DeleteStream 操作集成,以删除调用方账户中的流。

  • 在 API 的 /streams/{stream-name}/record 资源上公开 HTTP PUT 方法,并将此方法与 Kinesis 中的 PutRecord 操作集成。这使客户端能够向指定流添加一个数据记录。

  • 在 API 的 /streams/{stream-name}/records 资源上公开 HTTP PUT 方法,并将此方法与 Kinesis 中的 PutRecords 操作集成。这使客户端能够向指定流添加一个数据记录列表。

  • 在 API 的 /streams/{stream-name}/records 资源上公开 HTTP GET 方法,并将此方法与 Kinesis 中的 GetRecords 操作集成。这使客户端能够使用指定的分片迭代器在指定流中列出数据记录。分片迭代器指定了分片位置,可以从该位置开始按顺序读取数据记录。

  • 在 API 的 /streams/{stream-name}/sharditerator 资源上公开 HTTP GET 方法,并将此方法与 Kinesis 中的 GetShardIterator 操作集成。此辅助标记方法必须提供给 Kinesis 中的 ListStreams 操作。

您可以将此处提供的说明应用于其他 Kinesis 操作。有关 Kinesis 操作的完整列表,请参阅 Amazon Kinesis API 参考

您可以使用 API Gateway 导入 API 将示例 API 导入到 API Gateway 中,而不是使用 API Gateway 控制台创建示例 API。有关如何使用 Import API 的信息,请参阅 在 API Gateway 中使用 OpenAPI 开发 REST API

为 API 创建 IAM 角色和策略以访问 Kinesis

要允许 API 调用 Kinesis 操作,您必须将适当的 IAM policy 附加到 IAM 角色。

创建 AWS 服务代理执行角色
  1. 登录AWS Management Console,然后通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 选择角色

  3. 选择 Create role(创建角色)。

  4. 选择受信任实体的类型下选择 AWS 服务,然后选择 API Gateway 并选择允许 API Gateway 将日志推送到 CloudWatch Logs

  5. 选择下一步,然后再次选择下一步

  6. 对于角色名称,输入 APIGatewayKinesisProxyPolicy,然后选择创建角色

  7. Roles 列表中,选择您刚创建的角色。您可能需要滚动或使用搜索栏来查找角色。

  8. 对于所选角色,选择添加权限选项卡。

  9. 从下拉列表中选择附加策略

  10. 在搜索栏中,输入 AmazonKinesisFullAccess 然后选择添加权限

    注意

    为简单起见,本教程使用托管策略。作为最佳实践,您应创建自己的 IAM 策略以授予所需的最低权限。

  11. 记下新创建的角色 ARN,稍后将使用它。

创建 API 作为 Kinesis 代理

使用以下步骤在 API Gateway 控制台中创建 API。

创建 API 作为 Kinesis 的AWS服务代理
  1. 通过以下网址登录到 Amazon API Gateway 控制台:https://console.aws.amazon.com/apigateway

  2. 如果您是第一次使用 API Gateway,您会看到一个介绍服务特征的页面。在 REST API 下,选择生成。当创建示例 API 弹出框出现时,选择确定

    如果这不是您首次使用 API Gateway,请选择创建 API。在 REST API 下,选择生成

  3. 选择新 API

  4. API 名称中,输入 KinesisProxy。对于所有其他字段,保留默认值。

  5. (可选)对于描述,输入描述。

  6. 选择 Create API (创建 API)

API 创建完成后,API Gateway 控制台将显示资源页面,该页面仅包含 API 的根 (/) 资源。

列出 Kinesis 中的流

Kinesis 支持使用以下 REST API 调用执行 ListStreams 操作:

POST /?Action=ListStreams HTTP/1.1 Host: kinesis.<region>.<domain> Content-Length: <PayloadSizeBytes> User-Agent: <UserAgentString> Content-Type: application/x-amz-json-1.1 Authorization: <AuthParams> X-Amz-Date: <Date> { ... }

在上述 REST API 请求中,已经在 Action 查询参数中指定了操作。或者,您可以在 X-Amz-Target 标头中指定操作:

POST / HTTP/1.1 Host: kinesis.<region>.<domain> Content-Length: <PayloadSizeBytes> User-Agent: <UserAgentString> Content-Type: application/x-amz-json-1.1 Authorization: <AuthParams> X-Amz-Date: <Date> X-Amz-Target: Kinesis_20131202.ListStreams { ... }

在本教程中,我们使用查询参数指定操作。

要在 API 中公开 Kinesis 操作,请将 /streams 资源添加到 API 的根中。然后,在此资源上设置 GET 方法,并将该方法与 Kinesis 的 ListStreams 操作集成。

以下过程介绍如何使用 API Gateway 控制台列出 Kinesis 流。

使用 API Gateway 控制台列出 Kinesis 流
  1. 选择 / 资源,然后选择创建资源

  2. 对于资源名称,输入 streams

  3. CORS(跨源资源共享)保持为关闭状态。

  4. 选择创建资源

  5. 选择 /streams 资源,再选择创建方法,然后执行以下操作:

    1. 对于方法类型,选择 GET

      注意

      客户端调用的方法的 HTTP 动词可能不同于后端所需的集成的 HTTP 动词。我们在此处选择了 GET,因为凭直觉判断,列出流是一个 READ 操作。

    2. 对于集成类型,选择 AWS 服务

    3. 对于 AWS 区域,选择您创建 Kinesis 流的 AWS 区域。

    4. 对于 AWS 服务,选择 Kinesis

    5. AWS 子域保留为空白。

    6. 对于 HTTP 方法,选择 POST

      注意

      我们在此处选择了 POST,因为 Kinesis 要求使用它来调用 ListStreams 操作。

    7. 对于操作类型,选择使用操作名称

    8. 对于操作名称,输入 ListStreams

    9. 对于执行角色,输入执行角色的 ARN。

    10. 对于内容处理,保留默认值传递

    11. 选择创建方法

  6. 集成请求选项卡的集成请求设置下,选择编辑

  7. 对于请求正文传递,选择当未定义模板时(推荐)

  8. 选择 URL 请求标头参数并执行以下操作:

    1. 选择添加请求标头参数

    2. 对于名称,请输入 Content-Type

    3. 对于映射自,输入 'application/x-amz-json-1.1'

    我们使用请求参数映射将 Content-Type 标头设置为静态值 'application/x-amz-json-1.1',以告知 Kinesis 该输入是特定版本的 JSON。

  9. 选择映射模板,然后选择添加映射模板并执行以下操作:

    1. 对于 Content-Type,输入 application/json

    2. 对于模板正文,输入 {}

    3. 选择保存

    ListStreams 请求使用以下 JSON 格式的负载:

    { "ExclusiveStartStreamName": "string", "Limit": number }

    但这些属性为可选属性,为了使用默认值,我们在此选择了一个空的 JSON 负载。

  10. /streams 资源上测试 GET 方法以调用 Kinesis 中的 ListStreams 操作:

    选择测试选项卡。您可能需要选择右箭头按钮,以显示该选项卡。

    选择测试,以测试您的方法。

    如果您已经在 Kinesis 中创建了两个分别名为“myStream”和“yourStream”的流,则成功的测试将返回一个包含以下负载的 200 OK 响应:

    { "HasMoreStreams": false, "StreamNames": [ "myStream", "yourStream" ] }

在 Kinesis 中创建、描述和删除流

在 Kinesis 中创建、描述和删除流分别需要发出以下 Kinesis REST API 请求:

POST /?Action=CreateStream HTTP/1.1 Host: kinesis.region.domain ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ShardCount": number, "StreamName": "string" }
POST /?Action=DescribeStream HTTP/1.1 Host: kinesis.region.domain ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "StreamName": "string" }
POST /?Action=DeleteStream HTTP/1.1 Host: kinesis.region.domain ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "StreamName":"string" }

我们可以构建 API 来接受必需的输入作为方法请求的 JSON 负载,并将负载传递到集成请求。但是,为了提供更多方法与集成请求之间以及方法与集成响应之间的数据映射示例,我们创建 API 的方式稍有不同。

我们在待指定的 GET 资源上公开 POSTDeleteStream HTTP 方法。我们使用 {stream-name} 路径变量作为流传输资源的占位符,并将这些 API 方法分别与 Kinesis 的 DescribeStreamCreateStreamDeleteStream 操作集成。我们要求客户端传递其他输入数据作为标头、查询参数或有效负载的方法请求。我们提供了集成数据所需的映射模板来转换请求负载。

创建 {stream-name} 资源
  1. 选择 /streams 资源,然后选择创建资源

  2. 代理资源保持为关闭状态。

  3. 对于资源路径,选择 /streams

  4. 对于资源名称,输入 {stream-name}

  5. CORS(跨源资源共享)保持为关闭状态。

  6. 选择创建资源

在流传输资源上配置和测试 GET 方法
  1. 选择 /{stream-name} 资源,然后选择创建方法

  2. 对于方法类型,选择 GET

  3. 对于集成类型,选择 AWS 服务

  4. 对于 AWS 区域,选择您创建 Kinesis 流的 AWS 区域。

  5. 对于 AWS 服务,选择 Kinesis

  6. AWS 子域保留为空白。

  7. 对于 HTTP 方法,选择 POST

  8. 对于操作类型,选择使用操作名称

  9. 对于操作名称,输入 DescribeStream

  10. 对于执行角色,输入执行角色的 ARN。

  11. 对于内容处理,保留默认值传递

  12. 选择创建方法

  13. 集成请求部分,添加以下 URL 请求标头参数

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  14. 添加以下正文映射模板,以将数据从 GET /streams/{stream-name} 方法请求映射到 POST /?Action=DescribeStream 集成请求:

    { "StreamName": "$input.params('stream-name')" }

    此映射模板使用方法请求的 DescribeStream 路径参数值为 Kinesis 的 stream-name 操作生成所需的集成请求负载。

  15. 要测试调用 Kinesis 中的 DescribeStream 操作的 GET /stream/{stream-name} 方法,请选择测试选项卡。

  16. 对于路径,在 stream-name 下输入一个现有 Kinesis 流的名称。

  17. 选择测试。如果测试成功,将返回一个 200 OK 响应,其所含负载与以下内容类似:

    { "StreamDescription": { "HasMoreShards": false, "RetentionPeriodHours": 24, "Shards": [ { "HashKeyRange": { "EndingHashKey": "68056473384187692692674921486353642290", "StartingHashKey": "0" }, "SequenceNumberRange": { "StartingSequenceNumber": "49559266461454070523309915164834022007924120923395850242" }, "ShardId": "shardId-000000000000" }, ... { "HashKeyRange": { "EndingHashKey": "340282366920938463463374607431768211455", "StartingHashKey": "272225893536750770770699685945414569164" }, "SequenceNumberRange": { "StartingSequenceNumber": "49559266461543273504104037657400164881014714369419771970" }, "ShardId": "shardId-000000000004" } ], "StreamARN": "arn:aws:kinesis:us-east-1:12345678901:stream/myStream", "StreamName": "myStream", "StreamStatus": "ACTIVE" } }

    部署 API 后,您可以根据此 API 方法做出 REST 请求:

    GET https://your-api-id.execute-api.region.amazonaws.com/stage/streams/myStream HTTP/1.1 Host: your-api-id.execute-api.region.amazonaws.com Content-Type: application/json Authorization: ... X-Amz-Date: 20160323T194451Z
在流传输资源上配置和测试 POST 方法
  1. 选择 /{stream-name} 资源,然后选择创建方法

  2. 对于方法类型,选择 POST

  3. 对于集成类型,选择 AWS 服务

  4. 对于 AWS 区域,选择您创建 Kinesis 流的 AWS 区域。

  5. 对于 AWS 服务,选择 Kinesis

  6. AWS 子域保留为空白。

  7. 对于 HTTP 方法,选择 POST

  8. 对于操作类型,选择使用操作名称

  9. 对于操作名称,输入 CreateStream

  10. 对于执行角色,输入执行角色的 ARN。

  11. 对于内容处理,保留默认值传递

  12. 选择创建方法

  13. 集成请求部分,添加以下 URL 请求标头参数

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  14. 添加以下正文映射模板,以将数据从 POST /streams/{stream-name} 方法请求映射到 POST /?Action=CreateStream 集成请求:

    { "ShardCount": #if($input.path('$.ShardCount') == '') 5 #else $input.path('$.ShardCount') #end, "StreamName": "$input.params('stream-name')" }

    在上述映射模板中,如果客户端未在方法请求负载中指定值,我们会将 ShardCount 设为固定值 5。

  15. 要测试调用 Kinesis 中的 CreateStream 操作的 POST /stream/{stream-name} 方法,请选择测试选项卡。

  16. 对于路径,在 stream-name 下输入一个新 Kinesis 流的名称。

  17. 选择测试。如果测试成功,将返回一个不含数据的 200 OK 响应。

    部署 API 后,您也可以针对流传输资源上的 POST 方法发出 REST API 请求,以调用 Kinesis 中的 CreateStream 操作:

    POST https://your-api-id.execute-api.region.amazonaws.com/stage/streams/yourStream HTTP/1.1 Host: your-api-id.execute-api.region.amazonaws.com Content-Type: application/json Authorization: ... X-Amz-Date: 20160323T194451Z { "ShardCount": 5 }
在流传输资源上配置和测试 DELETE 方法
  1. 选择 /{stream-name} 资源,然后选择创建方法

  2. 对于方法类型,选择 DELETE

  3. 对于集成类型,选择 AWS 服务

  4. 对于 AWS 区域,选择您创建 Kinesis 流的 AWS 区域。

  5. 对于 AWS 服务,选择 Kinesis

  6. AWS 子域保留为空白。

  7. 对于 HTTP 方法,选择 POST

  8. 对于操作类型,选择使用操作名称

  9. 对于操作名称,输入 DeleteStream

  10. 对于执行角色,输入执行角色的 ARN。

  11. 对于内容处理,保留默认值传递

  12. 选择创建方法

  13. 集成请求部分,添加以下 URL 请求标头参数

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  14. 添加以下正文映射模板,以将数据从 DELETE /streams/{stream-name} 方法请求映射到 POST /?Action=DeleteStream 的相应集成请求:

    { "StreamName": "$input.params('stream-name')" }

    此映射模板将使用客户端提供的 URL 路径名称 DELETE /streams/{stream-name}stream-name 操作生成所需的输入。

  15. 要测试调用 Kinesis 中的 DeleteStream 操作的 DELETE /stream/{stream-name} 方法,请选择测试选项卡。

  16. 对于路径,在 stream-name 下输入一个现有 Kinesis 流的名称。

  17. 选择测试。如果测试成功,将返回一个不含数据的 200 OK 响应。

    部署 API 后,您也可以针对流传输资源上的 DELETE 方法发出以下 REST API 请求,以调用 Kinesis 中的 DeleteStream 操作:

    DELETE https://your-api-id.execute-api.region.amazonaws.com/stage/streams/yourStream HTTP/1.1 Host: your-api-id.execute-api.region.amazonaws.com Content-Type: application/json Authorization: ... X-Amz-Date: 20160323T194451Z {}

从 Kinesis 中的流获取记录并向其添加记录

在 Kinesis 中创建流后,您可以将数据记录添加到流中,也可以从流中读取数据。添加数据记录包括调用 Kinesis 中的 PutRecordsPutRecord 操作。前者可向流添加多条记录,而后者可向流添加一条记录。

POST /?Action=PutRecords HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "Records": [ { "Data": blob, "ExplicitHashKey": "string", "PartitionKey": "string" } ], "StreamName": "string" }

POST /?Action=PutRecord HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "Data": blob, "ExplicitHashKey": "string", "PartitionKey": "string", "SequenceNumberForOrdering": "string", "StreamName": "string" }

其中,StreamName 用于标识要添加记录的目标流。StreamNameDataPartitionKey 是必需的输入数据。在示例中,我们针对所有可选输入数据使用了默认值,并且不会在方法请求的输入中显式指定它们的值。

读取 Kinesis 中的数据相当于调用 GetRecords 操作:

POST /?Action=GetRecords HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ShardIterator": "string", "Limit": number }

其中,我们要从中获取记录的源流在必需的 ShardIterator 值中进行指定,如以下获取分片迭代器的 Kinesis 操作中所示:

POST /?Action=GetShardIterator HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ShardId": "string", "ShardIteratorType": "string", "StartingSequenceNumber": "string", "StreamName": "string" }

对于 GetRecordsPutRecords 操作,我们在附加到指定流传输资源 (GET) 的 PUT 资源上分别使用了 /records/{stream-name} 方法。同样,我们在 PutRecord 资源上使用了 PUT 操作作为 /record 方法。

由于 GetRecords 操作将 ShardIterator 值 (该值通过调用 GetShardIterator 辅助标记操作获得) 作为输入,我们在 GET 资源 (ShardIterator) 上使用了 /sharditerator 辅助标记方法。

创建 /record、/records 和 /sharditerator 资源
  1. 选择 /{stream-name} 资源,然后选择创建资源

  2. 代理资源保持为关闭状态。

  3. 对于资源路径,选择 /{stream-name}

  4. 对于资源名称,输入 record

  5. CORS(跨源资源共享)保持为关闭状态。

  6. 选择创建资源

  7. 重复上述步骤,创建 /records/sharditerator 资源。最终 API 应与以下内容类似:

    为 API 创建 Records:GET|PUT|PUT|GET 方法。

以下四个过程介绍了如何设置每个方法,如何将数据从方法请求映射到集成请求,以及如何测试方法。

设置并测试 PUT /streams/{stream-name}/record 方法以调用 Kinesis 中的 PutRecord
  1. 选择 /record,然后选择创建方法

  2. 对于方法类型,选择 PUT

  3. 对于集成类型,选择 AWS 服务

  4. 对于 AWS 区域,选择您创建 Kinesis 流的 AWS 区域。

  5. 对于 AWS 服务,选择 Kinesis

  6. AWS 子域保留为空白。

  7. 对于 HTTP 方法,选择 POST

  8. 对于操作类型,选择使用操作名称

  9. 对于操作名称,输入 PutRecord

  10. 对于执行角色,输入执行角色的 ARN。

  11. 对于内容处理,保留默认值传递

  12. 选择创建方法

  13. 集成请求部分,添加以下 URL 请求标头参数

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  14. 添加以下正文映射模板,以将数据从 PUT /streams/{stream-name}/record 方法请求映射到 POST /?Action=PutRecord 的相应集成请求:

    { "StreamName": "$input.params('stream-name')", "Data": "$util.base64Encode($input.json('$.Data'))", "PartitionKey": "$input.path('$.PartitionKey')" }

    此映射模板假定方法请求负载采用的是以下格式:

    { "Data": "some data", "PartitionKey": "some key" }

    此数据可通过以下 JSON 架构建模:

    { "$schema": "http://json-schema.org/draft-04/schema#", "title": "PutRecord proxy single-record payload", "type": "object", "properties": { "Data": { "type": "string" }, "PartitionKey": { "type": "string" } } }

    您可以创建一个模型以包含此架构,并使用该模型来帮助生成映射模板。但您也可以在不使用任何模型的情况下生成映射模板。

  15. 要测试 PUT /streams/{stream-name}/record 方法,请将 stream-name 路径变量设置为现有流的名称,提供所需格式的负载,然后提交方法请求。如果成功,将返回一个包含以下格式负载的 200 OK 响应:

    { "SequenceNumber": "49559409944537880850133345460169886593573102115167928386", "ShardId": "shardId-000000000004" }
设置并测试 PUT /streams/{stream-name}/records 方法以调用 Kinesis 中的 PutRecords
  1. 选择 /records 资源,然后选择创建方法

  2. 对于方法类型,选择 PUT

  3. 对于集成类型,选择 AWS 服务

  4. 对于 AWS 区域,选择您创建 Kinesis 流的 AWS 区域。

  5. 对于 AWS 服务,选择 Kinesis

  6. AWS 子域保留为空白。

  7. 对于 HTTP 方法,选择 POST

  8. 对于操作类型,选择使用操作名称

  9. 对于操作名称,输入 PutRecords

  10. 对于执行角色,输入执行角色的 ARN。

  11. 对于内容处理,保留默认值传递

  12. 选择创建方法

  13. 集成请求部分,添加以下 URL 请求标头参数

    Content-Type: 'x-amz-json-1.1'

    该任务将按照相同的过程为 GET /streams 方法设置请求参数映射。

  14. 添加以下映射模板,以将数据从 PUT /streams/{stream-name}/records 方法请求映射到 POST /?Action=PutRecords 的相应集成请求:

    { "StreamName": "$input.params('stream-name')", "Records": [ #foreach($elem in $input.path('$.records')) { "Data": "$util.base64Encode($elem.data)", "PartitionKey": "$elem.partition-key" }#if($foreach.hasNext),#end #end ] }

    此映射模板假设可以通过以下 JSON 架构为方法请求负载建模:

    { "$schema": "http://json-schema.org/draft-04/schema#", "title": "PutRecords proxy payload data", "type": "object", "properties": { "records": { "type": "array", "items": { "type": "object", "properties": { "data": { "type": "string" }, "partition-key": { "type": "string" } } } } } }

    您可以创建一个模型以包含此架构,并使用该模型来帮助生成映射模板。但您也可以在不使用任何模型的情况下生成映射模板。

    在本教程中,我们使用了两种稍有不同的负载格式来说明 API 开发人员可以选择向客户端公开或隐藏后端数据格式。一种格式用于 PUT /streams/{stream-name}/records 方法 (上文)。另一种格式用于 PUT /streams/{stream-name}/record 方法 (上一过程)。在生产环境中,您应该将两种格式保持一致。

  15. 要测试 PUT /streams/{stream-name}/records 方法,请将 stream-name 路径变量设置为现有流,提供以下负载,并提交方法请求。

    { "records": [ { "data": "some data", "partition-key": "some key" }, { "data": "some other data", "partition-key": "some key" } ] }

    如果成功,将返回一个包含类似以下输出的负载的 200 OK 响应:

    { "FailedRecordCount": 0, "Records": [ { "SequenceNumber": "49559409944537880850133345460167468741933742152373764162", "ShardId": "shardId-000000000004" }, { "SequenceNumber": "49559409944537880850133345460168677667753356781548470338", "ShardId": "shardId-000000000004" } ] }
设置并测试 GET /streams/{stream-name}/sharditerator 方法以调用 Kinesis 中的 GetShardIterator

GET /streams/{stream-name}/sharditerator 方法为辅助标记方法,用于在调用 GET /streams/{stream-name}/records 方法之前获得必需的分片迭代器。

  1. 选择 /sharditerator 资源,然后选择创建方法

  2. 对于方法类型,选择 GET

  3. 对于集成类型,选择 AWS 服务

  4. 对于 AWS 区域,选择您创建 Kinesis 流的 AWS 区域。

  5. 对于 AWS 服务,选择 Kinesis

  6. AWS 子域保留为空白。

  7. 对于 HTTP 方法,选择 POST

  8. 对于操作类型,选择使用操作名称

  9. 对于操作名称,输入 GetShardIterator

  10. 对于执行角色,输入执行角色的 ARN。

  11. 对于内容处理,保留默认值传递

  12. 选择 URL 查询字符串参数

    GetShardIterator 操作需要输入 ShardId 值。要传递客户端提供的 ShardId 值,我们将向方法请求添加一个 shard-id 查询参数,如以下步骤所示。

  13. 选择添加查询字符串

  14. 名称中,输入 shard-id

  15. 保持必填缓存为已关闭状态。

  16. 选择创建方法

  17. 集成请求部分中,添加以下映射模板,以从方法请求的 shard-idstream-name 参数生成 GetShardIterator 操作所需的输入(ShardIdStreamName)。此外,映射模板还需将 ShardIteratorType 设置为 TRIM_HORIZON,并作为默认值。

    { "ShardId": "$input.params('shard-id')", "ShardIteratorType": "TRIM_HORIZON", "StreamName": "$input.params('stream-name')" }
  18. 使用 API Gateway 控制台中的测试选项,输入现有流名称作为 stream-name 路径变量值,将 shard-id 查询字符串设置为现有 ShardId 值(例如 shard-000000000004),然后选择测试

    成功的响应负载与以下输出类似:

    { "ShardIterator": "AAAAAAAAAAFYVN3VlFy..." }

    记下此 ShardIterator 值。您需要使用此值来从流中获取记录。

配置并测试 GET /streams/{stream-name}/records 方法以调用 Kinesis 中的 GetRecords 操作
  1. 选择 /records 资源,然后选择创建方法

  2. 对于方法类型,选择 GET

  3. 对于集成类型,选择 AWS 服务

  4. 对于 AWS 区域,选择您创建 Kinesis 流的 AWS 区域。

  5. 对于 AWS 服务,选择 Kinesis

  6. AWS 子域保留为空白。

  7. 对于 HTTP 方法,选择 POST

  8. 对于操作类型,选择使用操作名称

  9. 对于操作名称,输入 GetRecords

  10. 对于执行角色,输入执行角色的 ARN。

  11. 对于内容处理,保留默认值传递

  12. 选择 HTTP 请求标头

    GetRecords 操作需要输入 ShardIterator 值。要传递客户端提供的 ShardIterator 值,我们将向方法请求添加一个 Shard-Iterator 标头参数。

  13. 选择添加标头

  14. 对于名称,请输入 Shard-Iterator

  15. 保持必填缓存为已关闭状态。

  16. 选择创建方法

  17. 集成请求部分中,添加以下正文映射模板,以便为 Kinesis 中的 GetRecords 操作将 Shard-Iterator 标头参数值映射到 JSON 负载的 ShardIterator 属性值。

    { "ShardIterator": "$input.params('Shard-Iterator')" }
  18. 使用 API Gateway 控制台中的测试选项,输入现有流名称作为 stream-name 路径变量值,将 Shard-Iterator 标头设置为从 GET /streams/{stream-name}/sharditerator 方法的测试运行中获得的 ShardIterator 值(上文),然后选择测试

    成功的响应负载与以下输出类似:

    { "MillisBehindLatest": 0, "NextShardIterator": "AAAAAAAAAAF...", "Records": [ ... ] }