使用 Lambda 处理 Amazon Kinesis Data Streams 记录 - AWS Lambda

使用 Lambda 处理 Amazon Kinesis Data Streams 记录

要使用 Lambda 处理 Amazon Kinesis Data Streams 记录,请为您的流创建一个使用者,然后创建 Lambda 事件源映射。

配置数据流和函数

您的 Lambda 函数是数据流的用户应用程序。对于每个分片,它一次处理一批记录。您可以将 Lambda 函数映射到共享吞吐量使用者(标准迭代器)或具有增强扇出功能的专用吞吐量使用者。

  • 标准迭代器:Lambda 将针对记录轮询 Kinesis 流中的每个分片(按照每秒一次的基本频率)。当有更多记录可用时,Lambda 会继续进行批处理,直到函数赶上流的速度。事件源映射与分区的其他使用者共享读取吞吐量。

  • 增强型扇出功能:为了最大限度地减少延迟并最大限度地提高读取吞吐量,请创建具有增强型扇出功能的数据流使用者。增强扇出功能使用者将获得与每个分片的专用连接,这不会影响从流中读取信息的其他应用程序。流使用者使用 HTTP/2 通过长期连接将记录推送到 Lambda 并压缩请求头来减少延迟。您可以使用 Kinesis RegisterStreamConsumer API 创建流使用者。

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

您应看到以下输出:

{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}

要提高函数处理记录的速度,请将分片添加到数据流中。Lambda 按顺序处理各个分区中的记录。如果您的函数返回错误,它会停止处理分片中的其他记录。使用更多分片,可以同时处理更多批次,从而降低错误对并发性的影响。

如果您的函数无法扩展以处理并发批处理的总数,请为您的函数请求提高配额预留并发

创建一个事件源映射来调用 Lambda 函数

要使用来自数据流的记录调用 Lambda 函数,请创建一个事件源映射。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个数据流的项目。处理来自多个流的项目时,每个批处理将只包含来自单个分片或流的记录。

您可以配置事件源映射来处理来自不同 AWS 账户中的流的记录。要了解更多信息,请参阅 创建跨账户事件源映射

在创建事件源映射之前,您需要向您的 Lambda 函数授予读取 Kinesis 数据流中数据的权限。Lambda 需要以下权限才能管理与您的 Kinesis 数据流相关的资源:

AWS 托管式策略 AWSLambdaKinesisExecutionRole 包含这些权限。按照以下过程所述将此托管式策略添加到您的函数。

AWS Management Console
为您的函数添加 Kinesis 权限
  1. 打开 Lambda 控制台的“函数”页面,然后选择函数。

  2. 配置选项卡中,选择权限

  3. 执行角色窗格的角色名称下,选择指向函数的执行角色的链接。此链接将在 IAM 控制台中打开该角色的页面。

  4. 权限策略窗格中,选择添加权限,然后选择附加策略

  5. 在搜索字段中输入 AWSLambdaKinesisExecutionRole

  6. 选中该策略名称旁边的复选框,然后选择添加权限

AWS CLI
为您的函数添加 Kinesis 权限
  • 运行以下 CLI 命令,以将 AWSLambdaKinesisExecutionRole 策略附加到函数的执行角色。

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
为您的函数添加 Kinesis 权限
  • 在函数定义中添加 Policies 属性,如以下示例所示:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole

配置所需的权限后,创建事件源映射。

AWS Management Console
创建 Kinesis 事件源映射
  1. 打开 Lambda 控制台的“函数”页面,然后选择函数。

  2. 函数概述窗格中,选择添加触发器

  3. 触发器配置下,对于源,请选择 Kinesis

  4. 选择要为其创建事件源映射的 Kinesis 流,也可以选择流的使用者。

  5. (可选)编辑事件源映射的批处理大小起始位置批处理窗口

  6. 选择 添加

在控制台中创建事件源映射时,您的 IAM 角色必须拥有 kinesis:ListStreamskinesis:ListStreamConsumers 权限。

AWS CLI
创建 Kinesis 事件源映射
  • 运行以下 CLI 命令以创建 Kinesis 事件源映射。根据您的应用场景选择自己的批量大小和起始位置。

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

要指定批处理时间窗,请添加 --maximum-batching-window-in-seconds 选项。有关使用此参数和其他参数的更多信息,请参阅《AWS CLI Command Reference》中的 create-event-source-mapping

AWS SAM
创建 Kinesis 事件源映射
  • 在函数定义中添加 KinesisEvent 属性,如以下示例所示:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

要了解有关在 AWS SAM 中创建 Kinesis 数据流事件源映射的更多信息,请参阅《AWS Serverless Application Model Developer Guide》中的 Kinesis

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

创建跨账户事件源映射

Amazon Kinesis Data Streams 支持基于资源的策略。因此,您可以在一个 AWS 账户中使用 Lambda 函数来处理另一个账户的流中摄入的数据。

要使用其他 AWS 账户中的 Kinesis 流为您的 Lambda 函数创建事件源映射,您必须使用基于资源的策略配置该流,以向您的 Lambda 函数授予读取相关项目的权限。要了解如何配置流以允许跨账户存取,请参阅《Amazon Kinesis Streams 开发人员指南》中的 Sharing access with cross-account AWS Lambda functions

使用基于资源的策略配置流以向您的 Lambda 函数授予所需的权限后,请使用上一节中描述的任何方法创建事件源映射。

如果您选择使用 Lambda 控制台创建事件源映射,请将流的 ARN 直接粘贴到输入字段中。如果您想指定流的使用者,粘贴使用者的 ARN 会自动填充流字段。