在 Lambda 中保留 Kinesis Data Streams 事件源的已丢弃批次记录 - AWS Lambda

在 Lambda 中保留 Kinesis Data Streams 事件源的已丢弃批次记录

Kinesis 事件源映射的错误处理取决于错误是在调用函数之前还是在函数调用期间发生的:

  • 调用前:如果 Lambda 事件源映射由于节流或其他问题而无法调用该函数,则它会一直重试,直到记录过期或超过事件源映射上配置的最大期限(MaximumRecordAgeInSeconds)。

  • 调用期间:如果调用函数但返回错误,Lambda 会重试,直到记录过期、超过最大期限(MaximumRecordAgeInSeconds)或达到配置的重试配额(MaximumRetryAttempts)。对于函数错误,您还可以配置 BisectBatchOnFunctionError,将失败的批次拆分为两个较小的批次,从而隔离错误记录并避免超时。拆分批次不会消耗重试配额。

如果错误处理措施失败,Lambda 将丢弃记录并继续处理数据流中的批次。使用默认设置时,这意味着错误的记录可能会阻止受影响的分区上的处理,时间最长为一周。为了避免这种情况,请配置函数的事件源映射,使用合理的重试次数和适合您的使用案例的最长记录期限。

配置失败调用的目标

要保留失败的事件源映射调用的记录,请在函数的事件源映射中添加一个目标。发送到目标的每条记录都是一个 JSON 文档,其中包含有关失败调用的元数据。对于 Amazon S3 目标,Lambda 还会发送整个调用记录以及元数据。您可以将任何 Amazon SNS 主题、Amazon SQS 队列或 S3 存储桶配置为目标。

借助 Amazon S3 目标,您可以使用 Amazon S3 事件通知功能在对象上传到目标 S3 存储桶时接收通知。您还可以将 S3 事件通知配置为调用另一个 Lambda 函数来对失败的批次执行自动处理。

您的执行角色必须具有目标的权限:

如果您已使用自己的 KMS 密钥为 S3 目标启用加密,则函数的执行角色还必须具有调用 kms:GenerateDataKey 的权限。如果 KMS 密钥和 S3 存储桶目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:GenerateDataKey。

要使用控制台配置失败时的目标,请执行以下步骤:

  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择函数。

  3. Function overview (函数概览) 下,选择 Add destination (添加目标)

  4. 对于,请选择事件源映射调用

  5. 对于事件源映射,请选择为此函数配置的事件源。

  6. 条件中,选择失败时。对于事件源映射调用,这是唯一可接受的条件。

  7. 对于目标类型,请选择 Lambda 要发送调用记录的目标类型。

  8. 对于 Destination (目标),请选择一个资源。

  9. 选择保存

您还可以使用 AWS Command Line Interface(AWS CLI)配置失败时的目标。例如,以 create-event-source-mapping 命令将带有 SQS 失败时目标的事件源映射添加到 MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

以下 update-event-source-mapping 命令更新事件源映射,以在两次重试之后或记录超过一小时后将失败的调用记录发送到 SNS 目标。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

更新的设置是异步应用的,并且直到该过程完成才反映在输出中。使用 get-event-source-mapping 命令查看当前状态。

要移除目标,请提供一个空字符串作为 destination-config 参数的实际参数:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Amazon S3 目标的安全最佳实践

如果删除配置为目标的 S3 存储桶而不将目标从函数配置中删除,则可能会造成安全风险。如果其他用户知道您的目标存储桶的名称,则他们可以在其 AWS 账户中重新创建存储桶。调用失败的记录将发送到存储桶,这可能会暴露您函数中的数据。

警告

为确保您的函数中的调用记录不会发送到另一个 AWS 账户中的 S3 存储桶,请向函数的执行角色添加条件,以将 s3:PutObject 权限限制为您账户中的存储桶。

以下示例显示了一个 IAM 策略,该策略将您函数的 s3:PutObject 权限限制为您账户中的存储桶。该策略还为 Lambda 提供了使用 S3 存储桶作为目标所需的 s3:ListBucket 权限。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

要使用 AWS Management Console或 AWS CLI 向函数的执行角色添加权限策略,请参阅以下程序中的说明:

Console
向函数的执行角色添加权限策略(控制台)
  1. 打开 Lamba 控制台的函数页面

  2. 选择想要修改其执行角色的 Lambda 函数。

  3. 配置选项卡中,选择权限

  4. 执行角色选项卡中,选择您的函数的角色名称,以打开该角色的 IAM 控制台页面。

  5. 通过执行以下操作,向角色添加权限策略:

    1. 权限策略窗格中,选择添加权限,然后选择创建内联策略

    2. 策略编辑器中,选择 JSON

    3. 将要添加的策略粘贴到编辑器中(替换现有 JSON),然后选择下一步

    4. 策略详细信息下,输入策略名称

    5. 选择 创建策略

AWS CLI
向函数的执行角色添加权限策略(CLI)
  1. 创建具有所需权限的 JSON 策略文档并将其保存在本地目录中。

  2. 使用 IAM put-role-policy CLI 命令向您函数的执行角色添加权限。在您保存 JSON 策略文档的目录中运行以下命令,并用您自己的值替换角色名称、策略名称和策略文档。

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Amazon SNS 和 Amazon SQS 调用记录示例

以下示例显示了 Lambda 在 Kinesis 事件源调用失败时向 SQS 队列或 SNS 主题发送的内容。由于 Lambda 仅为这些目标类型发送元数据,因此请使用 streamArnshardIdstartSequenceNumberendSequenceNumber 字段获取完整的原始记录。KinesisBatchInfo 属性中显示的所有字段始终存在。

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

您可以使用此信息从流中检索受影响的记录以进行故障排除。实际记录不包括在内,因此您必须处理此记录并在记录过期和丢失之前从流中检索它们。

Amazon S3 调用记录示例

以下示例显示了 Lambda 在 Kinesis 事件源调用失败时向 Amazon S3 存储桶发送的内容。除了针对 SQS 和 SNS 目标的上一示例中的所有字段外,payload 字段还包含作为转义 JSON 字符串的原始调用记录。

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" }, "payload": "<Whole Event>" // Only available in S3 }

包含了调用记录的 S3 对象使用以下命名约定:

aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>