Retain discarded batch records for a Kinesis Data Streams event source in Lambda - AWS Lambda

Retain discarded batch records for a Kinesis Data Streams event source in Lambda

Error handling for Kinesis event source mappings depends on whether the error occurs before the function is invoked or during function invocation:

  • Before invocation: If a Lambda event source mapping is unable to invoke the function due to throttling or other issues, it retries until the records expire or exceed the maximum age configured on the event source mapping (MaximumRecordAgeInSeconds).

  • During invocation: If the function is invoked but returns an error, Lambda retries until the records expire, exceed the maximum age (MaximumRecordAgeInSeconds), or reach the configured retry quota (MaximumRetryAttempts). For function errors, you can also configure BisectBatchOnFunctionError, which splits a failed batch into two smaller batches, isolating bad records and avoiding timeouts. Splitting batches doesn't consume the retry quota.

If the error handling measures fail, Lambda discards the records and continues processing batches from the stream. With the default settings, this means that a bad record can block processing on the affected shard for up to one week. To avoid this, configure your function's event source mapping with a reasonable number of retries and a maximum record age that fits your use case.

Configuring destinations for failed invocations

To retain records of failed event source mapping invocations, add a destination to your function's event source mapping. Each record sent to the destination is a JSON document with metadata about the failed invocation. You can configure any Amazon SNS topic or Amazon SQS queue as a destination. Your execution role must have permissions for the destination:

To configure an on-failure destination using the console, follow these steps:

  1. Open the Functions page of the Lambda console.

  2. Choose a function.

  3. Under Function overview, choose Add destination.

  4. For Source, choose Event source mapping invocation.

  5. For Event source mapping, choose an event source that's configured for this function.

  6. For Condition, select On failure. For event source mapping invocations, this is the only accepted condition.

  7. For Destination type, choose the destination type that Lambda sends invocation records to.

  8. For Destination, choose a resource.

  9. Choose Save.

You can also configure an on-failure destination using the AWS Command Line Interface (AWS CLI). For example, the following create-event-source-mapping command adds an event source mapping with an SQS on-failure destination to MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

The following update-event-source-mapping command updates an event source mapping to send failed invocation records to an SNS destination after two retry attempts, or if the records are more than an hour old.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

Updated settings are applied asynchronously and aren't reflected in the output until the process completes. Use the get-event-source-mapping command to view the current status.

To remove a destination, supply an empty string as the argument to the destination-config parameter:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

The following example shows what Lambda sends to an SQS queue or SNS topic for a failed Kinesis event source invocation. Because Lambda sends only the metadata for these destination types, use the streamArn, shardId, startSequenceNumber, and endSequenceNumber fields to obtain the full original record. All of the fields shown in the KinesisBatchInfo property will always be present.

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

You can use this information to retrieve the affected records from the stream for troubleshooting. The actual records aren't included, so you must process this record and retrieve them from the stream before they expire and are lost.