在 Lambda 中保留 Kinesis Data Streams 事件源的已丢弃批次记录
Kinesis 事件源映射的错误处理取决于错误是在调用函数之前还是在函数调用期间发生的:
-
调用前:如果 Lambda 事件源映射由于节流或其他问题而无法调用该函数,则它会一直重试,直到记录过期或超过事件源映射上配置的最大期限(MaximumRecordAgeInSeconds)。
-
调用期间:如果调用函数但返回错误,Lambda 会重试,直到记录过期、超过最大期限(MaximumRecordAgeInSeconds)或达到配置的重试配额(MaximumRetryAttempts)。对于函数错误,您还可以配置 BisectBatchOnFunctionError,将失败的批次拆分为两个较小的批次,从而隔离错误记录并避免超时。拆分批次不会消耗重试配额。
如果错误处理措施失败,Lambda 将丢弃记录并继续处理数据流中的批次。使用默认设置时,这意味着错误的记录可能会阻止受影响的分区上的处理,时间最长为一周。为了避免这种情况,请配置函数的事件源映射,使用合理的重试次数和适合您的使用案例的最长记录期限。
配置失败调用的目标
要保留失败的事件源映射调用的记录,请在函数的事件源映射中添加一个目标。发送到目标的每条记录都是一个 JSON 文档,其中包含有关失败调用的元数据。对于 Amazon S3 目标,Lambda 还会发送整个调用记录以及元数据。您可以将任何 Amazon SNS 主题、Amazon SQS 队列或 S3 存储桶配置为目标。
借助 Amazon S3 目标,您可以使用 Amazon S3 事件通知功能在对象上传到目标 S3 存储桶时接收通知。您还可以将 S3 事件通知配置为调用另一个 Lambda 函数来对失败的批次执行自动处理。
您的执行角色必须具有目标的权限:
-
对于 SQS 目标:sqs:SendMessage
-
对于 SNS 目标:sns:Publish
-
对于 S3 存储桶目标:s3:PutObject 和 s3:ListBucket
如果您已使用自己的 KMS 密钥为 S3 目标启用加密,则函数的执行角色还必须具有调用 kms:GenerateDataKey 的权限。如果 KMS 密钥和 S3 存储桶目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:GenerateDataKey。
要使用控制台配置失败时的目标,请执行以下步骤:
打开 Lamba 控制台的 Functions
(函数)页面。 -
选择函数。
-
在 Function overview (函数概览) 下,选择 Add destination (添加目标)。
-
对于源,请选择事件源映射调用。
-
对于事件源映射,请选择为此函数配置的事件源。
-
在条件中,选择失败时。对于事件源映射调用,这是唯一可接受的条件。
-
对于目标类型,请选择 Lambda 要发送调用记录的目标类型。
-
对于 Destination (目标),请选择一个资源。
-
选择保存。
您还可以使用 AWS Command Line Interface(AWS CLI)配置失败时的目标。例如,以 create-event-source-mappingMyFunction
:
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
以下 update-event-source-mapping
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
更新的设置是异步应用的,并且直到该过程完成才反映在输出中。使用 get-event-source-mapping
要移除目标,请提供一个空字符串作为 destination-config
参数的实际参数:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
Amazon S3 目标的安全最佳实践
如果删除配置为目标的 S3 存储桶而不将目标从函数配置中删除,则可能会造成安全风险。如果其他用户知道您的目标存储桶的名称,则他们可以在其 AWS 账户中重新创建存储桶。调用失败的记录将发送到存储桶,这可能会暴露您函数中的数据。
警告
为确保您的函数中的调用记录不会发送到另一个 AWS 账户中的 S3 存储桶,请向函数的执行角色添加条件,以将 s3:PutObject
权限限制为您账户中的存储桶。
以下示例显示了一个 IAM 策略,该策略将您函数的 s3:PutObject
权限限制为您账户中的存储桶。该策略还为 Lambda 提供了使用 S3 存储桶作为目标所需的 s3:ListBucket
权限。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount":
"111122223333"
} } } ] }
要使用 AWS Management Console或 AWS CLI 向函数的执行角色添加权限策略,请参阅以下程序中的说明:
Amazon SNS 和 Amazon SQS 调用记录示例
以下示例显示了 Lambda 在 Kinesis 事件源调用失败时向 SQS 队列或 SNS 主题发送的内容。由于 Lambda 仅为这些目标类型发送元数据,因此请使用 streamArn
、shardId
、startSequenceNumber
和 endSequenceNumber
字段获取完整的原始记录。KinesisBatchInfo
属性中显示的所有字段始终存在。
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }
您可以使用此信息从流中检索受影响的记录以进行故障排除。实际记录不包括在内,因此您必须处理此记录并在记录过期和丢失之前从流中检索它们。
Amazon S3 调用记录示例
以下示例显示了 Lambda 在 Kinesis 事件源调用失败时向 Amazon S3 存储桶发送的内容。除了针对 SQS 和 SNS 目标的上一示例中的所有字段外,payload
字段还包含作为转义 JSON 字符串的原始调用记录。
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" }, "payload": "<Whole Event>" // Only available in S3 }
包含了调用记录的 S3 对象使用以下命名约定:
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>