捕获 Amazon MSK 事件源丢弃的批处理 - AWS Lambda

捕获 Amazon MSK 事件源丢弃的批处理

要保留失败的事件源映射调用的记录,请在函数的事件源映射中添加一个目标。发送到目标的每条记录都是一个 JSON 文档,其中包含有关失败调用的元数据。对于 Amazon S3 目标,Lambda 还会发送整个调用记录以及元数据。您可以将任何 Amazon SNS 主题、Amazon SQS 队列或 S3 存储桶配置为目标。

借助 Amazon S3 目标,您可以使用 Amazon S3 事件通知功能在对象上传到目标 S3 存储桶时接收通知。您还可以将 S3 事件通知配置为调用另一个 Lambda 函数来对失败的批次执行自动处理。

您的执行角色必须具有目标的权限:

您必须在 Amazon MSK 集群 VPC 中为故障目标服务部署 VPC 端点。

此外,如果您在目标上配置了 KMS 密钥,则根据具体目标类型,Lambda 需要以下权限:

  • 如果您已使用自己的 KMS 密钥为 S3 目标启用加密,则需要 kms:GenerateDataKey。如果 KMS 密钥和 S3 存储桶目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:GenerateDataKey。

  • 如果您已使用自己的 KMS 密钥为 SQS 目标启用加密,则需要 kms:Decryptkms:GenerateDataKey。如果 KMS 密钥和 SQS 队列目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKeykms:ReEncrypt

  • 如果您已使用自己的 KMS 密钥为 SNS 目标启用加密,则需要 kms:Decryptkms:GenerateDataKey。如果 KMS 密钥和 SNS 主题目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKeykms:ReEncrypt

为 Amazon MSK 事件源映射配置失败时的目标

要使用控制台配置失败时的目标,请执行以下步骤:

  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择函数。

  3. Function overview (函数概览) 下,选择 Add destination (添加目标)

  4. 对于,请选择事件源映射调用

  5. 对于事件源映射,请选择为此函数配置的事件源。

  6. 条件中,选择失败时。对于事件源映射调用,这是唯一可接受的条件。

  7. 对于目标类型,请选择 Lambda 要发送调用记录的目标类型。

  8. 对于 Destination (目标),请选择一个资源。

  9. 选择保存

您还可以使用 AWS CLI 配置失败时的目标。例如,以 create-event-source-mapping 命令将带有 SQS 失败时目标的事件源映射添加到 MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

以下 update-event-source-mapping 命令将 S3 失败时目标添加到与输入 uuid 关联的事件源:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

要移除目标,请提供一个空字符串作为 destination-config 参数的实际参数:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Amazon S3 目标的安全最佳实践

如果删除配置为目标的 S3 存储桶而不将目标从函数配置中删除,则可能会造成安全风险。如果其他用户知道您的目标存储桶的名称,则他们可以在其 AWS 账户中重新创建存储桶。调用失败的记录将发送到存储桶,这可能会暴露您函数中的数据。

警告

为确保您的函数中的调用记录不会发送到另一个 AWS 账户中的 S3 存储桶,请向函数的执行角色添加条件,以将 s3:PutObject 权限限制为您账户中的存储桶。

以下示例显示了一个 IAM 策略,该策略将您函数的 s3:PutObject 权限限制为您账户中的存储桶。该策略还为 Lambda 提供了使用 S3 存储桶作为目标所需的 s3:ListBucket 权限。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

要使用 AWS Management Console或 AWS CLI 向函数的执行角色添加权限策略,请参阅以下程序中的说明:

Console
向函数的执行角色添加权限策略(控制台)
  1. 打开 Lamba 控制台的函数页面

  2. 选择想要修改其执行角色的 Lambda 函数。

  3. 配置选项卡中,选择权限

  4. 执行角色选项卡中,选择您的函数的角色名称,以打开该角色的 IAM 控制台页面。

  5. 通过执行以下操作,向角色添加权限策略:

    1. 权限策略窗格中,选择添加权限,然后选择创建内联策略

    2. 策略编辑器中,选择 JSON

    3. 将要添加的策略粘贴到编辑器中(替换现有 JSON),然后选择下一步

    4. 策略详细信息下,输入策略名称

    5. 选择 创建策略

AWS CLI
向函数的执行角色添加权限策略(CLI)
  1. 创建具有所需权限的 JSON 策略文档并将其保存在本地目录中。

  2. 使用 IAM put-role-policy CLI 命令向您函数的执行角色添加权限。在您保存 JSON 策略文档的目录中运行以下命令,并用您自己的值替换角色名称、策略名称和策略文档。

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

SNS 和 SQS 示例调用记录

以下示例显示了 Lambda 在 Kafka 事件源调用失败时向 SNS 主题或 SQS 队列目标发送的内容。recordsInfo 下面的每个密钥都包含 Kafka 主题和分区,用连字符分隔。例如,对于密钥 "Topic-0"Topic 是 Kafka 主题,0 是分区。对于每个主题和分区,可以使用偏移量和时间戳数据来查找原始调用记录。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3 目标示例调用记录

对于 S3 目标,Lambda 会将整个调用记录以及元数据发送到目标。以下示例显示了 Lambda 因调用 Kafka 事件源失败而向 S3 存储桶目标发送消息。除了针对 SQS 和 SNS 目标的上一示例中的所有字段外,payload 字段还包含作为转义 JSON 字符串的原始调用记录。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
提示

我们建议在目标存储桶上启用 S3 版本控制。