Lambda で Kinesis Data Streams イベントソースの破棄されたバッチレコードを保持する - AWS Lambda

Lambda で Kinesis Data Streams イベントソースの破棄されたバッチレコードを保持する

Kinesis イベントソースマッピングのエラー処理は、エラーが関数の呼び出し前に発生するか、関数の呼び出し中に発生するかによって異なります。

  • 呼び出し前: スロットリングまたはその他の問題によって Lambda イベントソースマッピングが関数を呼び出すことができない場合、レコードの有効期限が切れるか、イベントソースマッピングで設定された最大有効期間 (MaximumRecordAgeInSeconds) を超えるまで再試行します。

  • 呼び出し中: 関数は呼び出されたがエラーが返された場合、Lambda はレコードの有効期限が切れるか、最大有効期間 (MaximumRecordAgeInSeconds) を超えるか、設定された再試行クォータ (MaximumRetryAttempts) に達するまで再試行します。関数エラーの場合、BisectBatchOnFunctionError を設定することもできます。これは、失敗したバッチを 2 つの小さなバッチに分割し、不良レコードを分離してタイムアウトを回避します。バッチを分割しても、再試行クォータは消費されません。

エラー処理の対策に失敗すると、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。デフォルト設定では、不良レコードによって、影響を受けるシャードでの処理が最大 1 週間ブロックされる可能性があります。これを回避するには、関数のイベントソースマッピングを、適切な再試行回数と、ユースケースに適合する最大レコード経過時間で設定します。

失敗した呼び出しの送信先の設定

失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。送信先に送られる各レコードは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。Amazon S3 送信先の場合、Lambda はメタデータと共に呼び出しレコード全体を送信します。任意の Amazon SNS トピック、Amazon SQS キュー、または S3 バケットを送信先として設定できます。

Amazon S3 送信先を使用すると、Amazon S3 イベント通知機能を使用して、オブジェクトが送信先 S3 バケットにアップロードされたときに通知を受け取ることができます。また、S3 イベント通知を設定して、失敗したバッチに対して別の Lambda 関数を呼び出し、自動処理を実行することもできます。

実行ロールには、送信先に対するアクセス許可が必要です。

S3 送信先に対して独自の KMS キーを使用した暗号化を有効にしている場合、関数の実行ロールには kms:GenerateDataKey を呼び出すためのアクセス許可も必要です。KMS キーと S3 バケットの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、kms:GenerateDataKey を許可するように実行ロールを信頼するように KMS キーを設定します。

障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。

  1. Lambda コンソールの [関数ページ] を開きます。

  2. 関数を選択します。

  3. [機能の概要 ] で、[送信先を追加 ] を選択します。

  4. [ソース] には、[イベントソースマッピング呼び出し] を選択します。

  5. [イベントソースマッピング] では、この関数用に設定されているイベントソースを選択します。

  6. [条件] には [失敗時] を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。

  7. [送信先タイプ] では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。

  8. [送信先] で、リソースを選択します。

  9. [Save] を選択します。

AWS Command Line Interface (AWS CLI) を使用して障害発生時の送信先を設定することもできます。例えば、次の create-event-source-mapping コマンドは、SQS を障害発生時の送信先として持つイベントソースマッピングを MyFunction に追加します。

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

次の update-event-source-mapping コマンドは、2 回の再試行後、またはレコードが 1 時間以上経過した場合に失敗した呼び出しレコードを SNS 送信先に送信するように、イベントソースマッピングを更新します。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。現在のステータスを表示するには、get-event-source-mapping コマンドを使用します。

送信先を削除するには、destination-config パラメータの引数として空の文字列を指定します。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Amazon S3 送信先のセキュリティのベストプラクティス

関数の設定から送信先を削除せずに、送信先として設定された S3 バケットを削除すると、セキュリティリスクが発生する可能性があります。別のユーザーが送信先バケットの名前を知っている場合は、その AWS アカウントでバケットを再作成できます。失敗した呼び出しのレコードがそのバケットに送信され、関数からのデータが公開される可能性があります。

警告

関数からの呼び出しレコードを別の AWS アカウントの S3 バケットに送信できないようにするには、s3:PutObject アクセス許可を自分アカウントのバケットに制限する条件を関数の実行ロールに追加します。

次の例は、関数の s3:PutObject アクセス許可を自分のアカウントのバケットに制限する IAM ポリシーを示しています。このポリシーは、送信先として S3 バケットを使用するために必要な s3:ListBucket アクセス許可も Lambda に付与します。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

AWS Management Consoleまたは AWS CLI を使用して、関数の実行ロールにアクセス許可ポリシーを追加する場合は、次の手順を参照してください。

Console
関数の実行ロールにアクセス許可ポリシーを追加するには (コンソール)
  1. Lambda コンソールの [関数ページ] を開きます。

  2. 実行ロールを変更する Lambda 関数を選択します。

  3. [構成] タブで、[アクセス許可] を選択します。

  4. [実行ロール] タブで、関数の [ロール名] を選択して、ロールの IAM コンソールページを開きます。

  5. 次の手順を実行してアクセス許可ポリシーをロールに追加します。

    1. [アクセス許可ポリシー] ペインで、[アクセス許可の追加][インラインポリシーを作成] を選択します。

    2. ポリシーエディタで、[JSON] を選択します。

    3. 追加するポリシーをエディタに貼り付け (既存の JSON を置き換える)、[次へ] を選択します。

    4. [ポリシーの詳細][ポリシー名] を入力します。

    5. [Create policy] を選択します。

AWS CLI
関数の実行ロールにアクセス許可ポリシーを追加するには (CLI)
  1. 必要なアクセス許可を持つ JSON ポリシードキュメントを作成し、ローカルディレクトリに保存します。

  2. IAM put-role-policy CLI コマンドを使用して、関数の実行ロールにアクセス許可を追加します。JSON ポリシードキュメントを保存したディレクトリから次のコマンドを実行して、ロール名、ポリシー名、ポリシードキュメントを独自の値に置き換えます。

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Amazon SNS および Amazon SQS の呼び出しレコードの例

以下の例は、Kinesis イベントソース呼び出しが失敗した場合に Lambda が SQS キューまたは SNS トピックに送信する内容を示しています。Lambda はこれらの送信先タイプにメタデータのみを送信するため、元のレコード全体を取得するには、streamArnshardIdstartSequenceNumberendSequenceNumber の各フィールドを使用します。KinesisBatchInfo プロパティに表示されるフィールドはすべて常に存在します。

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

この情報は、トラブルシューティングのためにストリームから影響を受けるレコードを取得する際に使用できます。実際のレコードは含まれていないので、有効期限が切れて失われる前に、このレコードを処理し、ストリームから取得する必要があります。

Amazon S3 呼び出しレコードの例

次の例は、Kinesis イベントソースの呼び出しが失敗した場合に Amazon S3 バケットに送信する内容を示しています。SQS と SNS の送信先に関する前例のすべてのフィールドに加えて、payload フィールドには元の呼び出しレコードがエスケープされた JSON 文字列として含まれています。

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" }, "payload": "<Whole Event>" // Only available in S3 }

呼び出しレコードを含む S3 オブジェクトでは、次の命名規則が使用されます。

aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>