在 Lambda 中保留 Kinesis Data Streams 事件來源的捨棄批次記錄 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Lambda 中保留 Kinesis Data Streams 事件來源的捨棄批次記錄

Kinesis 事件來源對映的錯誤處理取決於在叫用函數之前還是在函數叫用期間發生錯誤:

如果錯誤處理措施失敗,Lambda 會捨棄相應記錄,並繼續處理串流中的批次。使用預設設定時,這表示不良的記錄可能會封鎖受影響碎片上的處理長達一週。若要避免此情況,在設定函數的事件來源映射時,請使用合理的重試次數和符合您使用案例的記錄最大保留期。

設定失敗呼叫的目的地

若要保留失敗的事件來源映射調用記錄,請將目標地新增到函數的事件來源映射中。每個傳送至目的地的記錄都是 JSON 文件,其中包含有關失敗叫用的中繼資料。您可以將任何 Amazon SNS 主題或 Amazon SQS 佇列設定為目的地。您的執行角色必須具有目標的權限:

  • 對於 SQS 目的地:sq s:SendMessage

  • 對於 SNS 目的地:SNS: 發佈

若要使用主控台設定失敗時的目的地,請依照下列步驟執行:

  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇一個函數。

  3. 函數概觀 下,選擇 新增目的地

  4. 針對來源,請選擇事件來源映射調用

  5. 對於事件來源映射,請選擇針對此函數設定的事件來源。

  6. 對於條件,選取失敗時。對於事件來源映射調用,這是唯一可接受的條件。

  7. 對於目標類型,請選擇 Lambda 將調用記錄傳送至的目標類型。

  8. 對於 目的地,請選擇一個資源。

  9. 選擇 儲存

您也可以使用 AWS Command Line Interface (AWS CLI) 來設定故障時的目的地。例如,下列建立事件來源對映指令會將具有失敗時之 SQS 目的地的事件來源對應新增至:MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

下列更新-事件來源映射命令會更新事件來源對應,以便在兩次重試嘗試後或記錄超過一個小時,將失敗的叫用記錄傳送至 SNS 目的地。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

系統會以非同步的方式套用更新的設定,在處理完成之前不會反映在輸出中。使用取得事件來源對映指令來檢視目前的狀態。

若要移除目的地,請提供空白字串作為 destination-config 參數的引數:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

下列範例顯示 Lambda 針對失敗的 Kinesis 事件來源調用而傳送至 SQS 佇列或 SNS 主題。由於 Lambda 只會傳送這些目標類型的中繼資料,因此請使用shardIdstartSequenceNumber、、和endSequenceNumber欄位來取得完整的原始記錄。streamArn

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

您可以使用此資訊來從串流擷取受影響的記錄,以進行疑難排解。實際的記錄不包含在內,因此您必須處理此記錄,並在因過期而遺失之前從資料串流中擷取它們。