本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
為自我管理的 Apache Kafka 事件來源擷取捨棄的批次
若要保留失敗的事件來源映射調用記錄,請將目標地新增到函數的事件來源映射中。每個傳送至目的地的記錄都是含有關失敗叫用之中繼資料的JSON文件。您可以將任何 Amazon SNS 主題、Amazon SQS 佇列或 S3 儲存貯體設定為目的地。您的執行角色必須具有目標的權限:
-
對於SQS目的地:sqs:SendMessage
-
對於SNS目的地:SNS: 發佈
-
對於 S3 儲存貯體目的地:s3:PutObject和 s3:ListBuckets
您必須在 Apache Kafka 叢集中為發生故障的目的地服務部署VPC端點。VPC
此外,如果您在目的地上設定KMS金鑰,Lambda 需要下列權限,視目的地類型而定:
-
如果您已使用自己的 S3 目的地金KMS鑰啟用加密,GenerateDataKey則需要 kms:。如果金KMS鑰和 S3 儲存貯體目的地與 Lambda 函數和執行角色位於不同的帳戶中,請設定金KMS鑰以信任執行角色以允許 KMS:GenerateDataKey
-
如果您已使用自己的SQS目的地金KMS鑰啟用加密功能,則需要 KMS: Decryp t 和 kms: GenerateDataKey。如果金KMS鑰和SQS佇列目的地與 Lambda 函數和執行角色位於不同的帳戶中,請將金KMS鑰設定為信任執行角色,以允許 KMS: 解密、公里:GenerateDataKey、公里:DescribeKey和公里:。ReEncrypt
-
如果您已使用自己的SNS目的地金KMS鑰啟用加密功能,則需要 KMS: Decryp t 和 kms: GenerateDataKey。如果金KMS鑰和SNS主題目的地與 Lambda 函數和執行角色位於不同的帳戶中,請將金KMS鑰設定為信任執行角色,以允許 KMS: 解密、公里:GenerateDataKey、公里:DescribeKey和公里:。ReEncrypt
為自我管理的 Apache Kafka 事件來源對應設定故障時的目的地
若要使用主控台設定失敗時的目的地,請依照下列步驟執行:
開啟 Lambda 主控台中的 函數頁面
。 -
選擇一個函數。
-
在 函數概觀 下,選擇 新增目的地。
-
針對來源,請選擇事件來源映射調用。
-
對於事件來源映射,請選擇針對此函數設定的事件來源。
-
對於條件,選取失敗時。對於事件來源映射調用,這是唯一可接受的條件。
-
對於目標類型,請選擇 Lambda 將調用記錄傳送至的目標類型。
-
對於 目的地,請選擇一個資源。
-
選擇 Save (儲存)。
您也可以使用設定失敗時的目的 AWS CLI地。例如,下列create-event-source-mappingMyFunction
:
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
下列update-event-source-mappinguuid
相關聯的事件來源:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
若要移除目的地,請提供空白字串作為 destination-config
參數的引數:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
SNS和SQS示例調用記錄
下列範例顯示 Lambda 針對失敗的 Kafka 事件來源叫用傳送至SNS主題或SQS佇列目的地的項目。recordsInfo
之下的每個索引鍵都包含 Kafka 主題和分割區,以連字號分隔。例如,對於金鑰 "Topic-0"
,Topic
是 Kafka 主題,並且 0
是分區。對於每個主題和分區,您可以使用偏移和時間戳記資料來查找原始調用記錄。
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }
S3 目的地範例叫用記錄
對於 S3 的目的地,Lambda 會將整個調用記錄與中繼資料一起傳送至目的地。下列範例顯示 Lambda 針對失敗的 Kafka 事件來源調用而傳送至 S3 儲存貯體目的地。除了上一個範例 in and 目的地中的所有欄位以SQS及SNS目的地內的payload
欄位以轉逸JSON字串的形式包含原始叫用記錄。
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
提示
建議您在目的地儲存貯體上啟用 S3 版本控制。