Lambda で Kinesis Data Streams イベントソースの破棄されたバッチレコードを保持する
Kinesis イベントソースマッピングのエラー処理は、エラーが関数の呼び出し前に発生するか、関数の呼び出し中に発生するかによって異なります。
-
呼び出し前: スロットリングまたはその他の問題によって Lambda イベントソースマッピングが関数を呼び出すことができない場合、レコードの有効期限が切れるか、イベントソースマッピングで設定された最大有効期間 (MaximumRecordAgeInSeconds) を超えるまで再試行します。
-
呼び出し中: 関数は呼び出されたがエラーが返された場合、Lambda はレコードの有効期限が切れるか、最大有効期間 (MaximumRecordAgeInSeconds) を超えるか、設定された再試行クォータ (MaximumRetryAttempts) に達するまで再試行します。関数エラーの場合、BisectBatchOnFunctionError を設定することもできます。これは、失敗したバッチを 2 つの小さなバッチに分割し、不良レコードを分離してタイムアウトを回避します。バッチを分割しても、再試行クォータは消費されません。
エラー処理の対策に失敗すると、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。デフォルト設定では、不良レコードによって、影響を受けるシャードでの処理が最大 1 週間ブロックされる可能性があります。これを回避するには、関数のイベントソースマッピングを、適切な再試行回数と、ユースケースに適合する最大レコード経過時間で設定します。
失敗した呼び出しの送信先の設定
失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。送信先に送られる各レコードは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。Amazon S3 送信先の場合、Lambda はメタデータと共に呼び出しレコード全体を送信します。任意の Amazon SNS トピック、Amazon SQS キュー、または S3 バケットを送信先として設定できます。
Amazon S3 送信先を使用すると、Amazon S3 イベント通知機能を使用して、オブジェクトが送信先 S3 バケットにアップロードされたときに通知を受け取ることができます。また、S3 イベント通知を設定して、失敗したバッチに対して別の Lambda 関数を呼び出し、自動処理を実行することもできます。
実行ロールには、送信先に対するアクセス許可が必要です。
-
SQS 送信先の場合: sqs:SendMessage
-
SNS 送信先の場合: sns:Publish
-
S3 バケット送信先の場合: s3:PutObject および s3:ListBucket
S3 送信先に対して独自の KMS キーを使用した暗号化を有効にしている場合、関数の実行ロールには kms:GenerateDataKey を呼び出すためのアクセス許可も必要です。KMS キーと S3 バケットの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、kms:GenerateDataKey を許可するように実行ロールを信頼するように KMS キーを設定します。
障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。
Lambda コンソールの [関数ページ]
を開きます。 -
関数を選択します。
-
[機能の概要 ] で、[送信先を追加 ] を選択します。
-
[ソース] には、[イベントソースマッピング呼び出し] を選択します。
-
[イベントソースマッピング] では、この関数用に設定されているイベントソースを選択します。
-
[条件] には [失敗時] を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。
-
[送信先タイプ] では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。
-
[送信先] で、リソースを選択します。
-
[Save] を選択します。
AWS Command Line Interface (AWS CLI) を使用して障害発生時の送信先を設定することもできます。例えば、次の create-event-source-mappingMyFunction
に追加します。
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
次の update-event-source-mapping
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。現在のステータスを表示するには、get-event-source-mapping
送信先を削除するには、destination-config
パラメータの引数として空の文字列を指定します。
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
Amazon S3 送信先のセキュリティのベストプラクティス
関数の設定から送信先を削除せずに、送信先として設定された S3 バケットを削除すると、セキュリティリスクが発生する可能性があります。別のユーザーが送信先バケットの名前を知っている場合は、その AWS アカウントでバケットを再作成できます。失敗した呼び出しのレコードがそのバケットに送信され、関数からのデータが公開される可能性があります。
警告
関数からの呼び出しレコードを別の AWS アカウントの S3 バケットに送信できないようにするには、s3:PutObject
アクセス許可を自分アカウントのバケットに制限する条件を関数の実行ロールに追加します。
次の例は、関数の s3:PutObject
アクセス許可を自分のアカウントのバケットに制限する IAM ポリシーを示しています。このポリシーは、送信先として S3 バケットを使用するために必要な s3:ListBucket
アクセス許可も Lambda に付与します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount":
"111122223333"
} } } ] }
AWS Management Consoleまたは AWS CLI を使用して、関数の実行ロールにアクセス許可ポリシーを追加する場合は、次の手順を参照してください。
Amazon SNS および Amazon SQS の呼び出しレコードの例
以下の例は、Kinesis イベントソース呼び出しが失敗した場合に Lambda が SQS キューまたは SNS トピックに送信する内容を示しています。Lambda はこれらの送信先タイプにメタデータのみを送信するため、元のレコード全体を取得するには、streamArn
、shardId
、startSequenceNumber
、endSequenceNumber
の各フィールドを使用します。KinesisBatchInfo
プロパティに表示されるフィールドはすべて常に存在します。
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }
この情報は、トラブルシューティングのためにストリームから影響を受けるレコードを取得する際に使用できます。実際のレコードは含まれていないので、有効期限が切れて失われる前に、このレコードを処理し、ストリームから取得する必要があります。
Amazon S3 呼び出しレコードの例
次の例は、Kinesis イベントソースの呼び出しが失敗した場合に Amazon S3 バケットに送信する内容を示しています。SQS と SNS の送信先に関する前例のすべてのフィールドに加えて、payload
フィールドには元の呼び出しレコードがエスケープされた JSON 文字列として含まれています。
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" }, "payload": "<Whole Event>" // Only available in S3 }
呼び出しレコードを含む S3 オブジェクトでは、次の命名規則が使用されます。
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>