捕获 Amazon MSK 事件源丢弃的批处理
要保留失败的事件源映射调用的记录,请在函数的事件源映射中添加一个目标。发送到目标的每条记录都是一个 JSON 文档,其中包含有关失败调用的元数据。对于 Amazon S3 目标,Lambda 还会发送整个调用记录以及元数据。您可以将任何 Amazon SNS 主题、Amazon SQS 队列或 S3 存储桶配置为目标。
借助 Amazon S3 目标,您可以使用 Amazon S3 事件通知功能在对象上传到目标 S3 存储桶时接收通知。您还可以将 S3 事件通知配置为调用另一个 Lambda 函数来对失败的批次执行自动处理。
您的执行角色必须具有目标的权限:
-
对于 SQS 目标:sqs:SendMessage
-
对于 SNS 目标:sns:Publish
-
对于 S3 存储桶目标:s3:PutObject 和 s3:ListBucket
您必须在 Amazon MSK 集群 VPC 中为故障目标服务部署 VPC 端点。
此外,如果您在目标上配置了 KMS 密钥,则根据具体目标类型,Lambda 需要以下权限:
-
如果您已使用自己的 KMS 密钥为 S3 目标启用加密,则需要 kms:GenerateDataKey。如果 KMS 密钥和 S3 存储桶目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:GenerateDataKey。
-
如果您已使用自己的 KMS 密钥为 SQS 目标启用加密,则需要 kms:Decrypt 和 kms:GenerateDataKey。如果 KMS 密钥和 SQS 队列目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKey 和 kms:ReEncrypt。
-
如果您已使用自己的 KMS 密钥为 SNS 目标启用加密,则需要 kms:Decrypt 和 kms:GenerateDataKey。如果 KMS 密钥和 SNS 主题目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKey 和 kms:ReEncrypt。
为 Amazon MSK 事件源映射配置失败时的目标
要使用控制台配置失败时的目标,请执行以下步骤:
打开 Lamba 控制台的 Functions
(函数)页面。 -
选择函数。
-
在 Function overview (函数概览) 下,选择 Add destination (添加目标)。
-
对于源,请选择事件源映射调用。
-
对于事件源映射,请选择为此函数配置的事件源。
-
在条件中,选择失败时。对于事件源映射调用,这是唯一可接受的条件。
-
对于目标类型,请选择 Lambda 要发送调用记录的目标类型。
-
对于 Destination (目标),请选择一个资源。
-
选择保存。
您还可以使用 AWS CLI 配置失败时的目标。例如,以 create-event-source-mappingMyFunction
:
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
以下 update-event-source-mappinguuid
关联的事件源:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
要移除目标,请提供一个空字符串作为 destination-config
参数的实际参数:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
Amazon S3 目标的安全最佳实践
如果删除配置为目标的 S3 存储桶而不将目标从函数配置中删除,则可能会造成安全风险。如果其他用户知道您的目标存储桶的名称,则他们可以在其 AWS 账户中重新创建存储桶。调用失败的记录将发送到存储桶,这可能会暴露您函数中的数据。
警告
为确保您的函数中的调用记录不会发送到另一个 AWS 账户中的 S3 存储桶,请向函数的执行角色添加条件,以将 s3:PutObject
权限限制为您账户中的存储桶。
以下示例显示了一个 IAM 策略,该策略将您函数的 s3:PutObject
权限限制为您账户中的存储桶。该策略还为 Lambda 提供了使用 S3 存储桶作为目标所需的 s3:ListBucket
权限。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount":
"111122223333"
} } } ] }
要使用 AWS Management Console或 AWS CLI 向函数的执行角色添加权限策略,请参阅以下程序中的说明:
SNS 和 SQS 示例调用记录
以下示例显示了 Lambda 在 Kafka 事件源调用失败时向 SNS 主题或 SQS 队列目标发送的内容。recordsInfo
下面的每个密钥都包含 Kafka 主题和分区,用连字符分隔。例如,对于密钥 "Topic-0"
,Topic
是 Kafka 主题,0
是分区。对于每个主题和分区,可以使用偏移量和时间戳数据来查找原始调用记录。
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }
S3 目标示例调用记录
对于 S3 目标,Lambda 会将整个调用记录以及元数据发送到目标。以下示例显示了 Lambda 因调用 Kafka 事件源失败而向 S3 存储桶目标发送消息。除了针对 SQS 和 SNS 目标的上一示例中的所有字段外,payload
字段还包含作为转义 JSON 字符串的原始调用记录。
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
提示
我们建议在目标存储桶上启用 S3 版本控制。