要保留失败的事件源映射调用的记录,请在函数的事件源映射中添加一个目标。发送到目标的每条记录都是一个 JSON 文档,其中包含有关失败调用的元数据。对于 Amazon S3 目标,Lambda 还会发送整个调用记录以及元数据。您可以将任何 Amazon SNS 主题、Amazon SQS 队列或 S3 存储桶配置为目标。
借助 Amazon S3 目标,您可以使用 Amazon S3 事件通知功能在对象上传到目标 S3 存储桶时接收通知。您还可以将 S3 事件通知配置为调用另一个 Lambda 函数来对失败的批次执行自动处理。
您的执行角色必须具有目标的权限:
-
对于 SQS 目标:sqs:SendMessage
-
对于 SNS 目标:sns:Publish
-
对于 S3 存储桶目标:s3:PutObject 和 s3:ListBucket
您必须在 Amazon MSK 集群 VPC 中为故障目标服务部署 VPC 端点。
此外,如果您在目标上配置了 KMS 密钥,则根据具体目标类型,Lambda 需要以下权限:
-
如果您已使用自己的 KMS 密钥为 S3 目标启用加密,则需要 kms:GenerateDataKey。如果 KMS 密钥和 S3 存储桶目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:GenerateDataKey。
-
如果您已使用自己的 KMS 密钥为 SQS 目标启用加密,则需要 kms:Decrypt 和 kms:GenerateDataKey。如果 KMS 密钥和 SQS 队列目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKey 和 kms:ReEncrypt。
-
如果您已使用自己的 KMS 密钥为 SNS 目标启用加密,则需要 kms:Decrypt 和 kms:GenerateDataKey。如果 KMS 密钥和 SNS 主题目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKey 和 kms:ReEncrypt。
为 Amazon MSK 事件源映射配置失败时的目标
要使用控制台配置失败时的目标,请执行以下步骤:
打开 Lamba 控制台的 Functions
(函数)页面。 -
选择函数。
-
在 Function overview (函数概览) 下,选择 Add destination (添加目标)。
-
对于源,请选择事件源映射调用。
-
对于事件源映射,请选择为此函数配置的事件源。
-
在条件中,选择失败时。对于事件源映射调用,这是唯一可接受的条件。
-
对于目标类型,请选择 Lambda 要发送调用记录的目标类型。
-
对于 Destination (目标),请选择一个资源。
-
选择保存。
您还可以使用 AWS CLI 配置失败时的目标。例如,以 create-event-source-mappingMyFunction
:
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
以下 update-event-source-mappinguuid
关联的事件源:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
要移除目标,请提供一个空字符串作为 destination-config
参数的实际参数:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
Amazon S3 目标的安全最佳实践
如果删除配置为目标的 S3 存储桶而不将目标从函数配置中删除,则可能会造成安全风险。如果其他用户知道您的目标存储桶的名称,则他们可以在其 AWS 账户中重新创建存储桶。调用失败的记录将发送到存储桶,这可能会暴露您函数中的数据。
警告
为确保您的函数中的调用记录不会发送到另一个 AWS 账户中的 S3 存储桶,请向函数的执行角色添加条件,以将 s3:PutObject
权限限制为您账户中的存储桶。
以下示例显示了一个 IAM 策略,该策略将您函数的 s3:PutObject
权限限制为您账户中的存储桶。该策略还为 Lambda 提供了使用 S3 存储桶作为目标所需的 s3:ListBucket
权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3BucketResourceAccountWrite",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:ListBucket"
],
"Resource": "arn:aws:s3:::*/*",
"Condition": {
"StringEquals": {
"s3:ResourceAccount": "111122223333"
}
}
}
]
}
要使用 AWS Management Console或 AWS CLI 向函数的执行角色添加权限策略,请参阅以下程序中的说明:
向函数的执行角色添加权限策略(控制台)
打开 Lamba 控制台的函数页面
。 -
选择想要修改其执行角色的 Lambda 函数。
-
在配置选项卡中,选择权限。
-
在执行角色选项卡中,选择您的函数的角色名称,以打开该角色的 IAM 控制台页面。
-
通过执行以下操作,向角色添加权限策略:
-
在权限策略窗格中,选择添加权限,然后选择创建内联策略。
-
在策略编辑器中,选择 JSON。
-
将要添加的策略粘贴到编辑器中(替换现有 JSON),然后选择下一步。
-
在策略详细信息下,输入策略名称。
-
选择 创建策略。
-
SNS 和 SQS 示例调用记录
以下示例显示了 Lambda 在 Kafka 事件源调用失败时向 SNS 主题或 SQS 队列目标发送的内容。recordsInfo
下面的每个密钥都包含 Kafka 主题和分区,用连字符分隔。例如,对于密钥 "Topic-0"
,Topic
是 Kafka 主题,0
是分区。对于每个主题和分区,可以使用偏移量和时间戳数据来查找原始调用记录。
{
"requestContext": {
"requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
"functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
"condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
"approximateInvokeCount": 1
},
"responseContext": { // null if record is MaximumPayloadSizeExceeded
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2019-11-14T00:38:06.021Z",
"KafkaBatchInfo": {
"batchSize": 500,
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"bootstrapServers": "...",
"payloadSize": 2039086, // In bytes
"recordsInfo": {
"Topic-0": {
"firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
"lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
"firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
"lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
},
"Topic-1": {
"firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
"lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
"firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
"lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
}
}
}
}
S3 目标示例调用记录
对于 S3 目标,Lambda 会将整个调用记录以及元数据发送到目标。以下示例显示了 Lambda 因调用 Kafka 事件源失败而向 S3 存储桶目标发送消息。除了针对 SQS 和 SNS 目标的上一示例中的所有字段外,payload
字段还包含作为转义 JSON 字符串的原始调用记录。
{
"requestContext": {
"requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
"functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
"condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
"approximateInvokeCount": 1
},
"responseContext": { // null if record is MaximumPayloadSizeExceeded
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2019-11-14T00:38:06.021Z",
"KafkaBatchInfo": {
"batchSize": 500,
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"bootstrapServers": "...",
"payloadSize": 2039086, // In bytes
"recordsInfo": {
"Topic-0": {
"firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
"lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
"firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
"lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
},
"Topic-1": {
"firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
"lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
"firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
"lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
}
}
},
"payload": "<Whole Event>" // Only available in S3
}
提示
我们建议在目标存储桶上启用 S3 版本控制。