Amazon MSK イベントソースの破棄されたバッチのキャプチャ
失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。送信先に送られる各レコードは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。Amazon S3 送信先の場合、Lambda はメタデータと共に呼び出しレコード全体を送信します。任意の Amazon SNS トピック、Amazon SQS キュー、または S3 バケットを送信先として設定できます。
Amazon S3 送信先を使用すると、Amazon S3 イベント通知機能を使用して、オブジェクトが送信先 S3 バケットにアップロードされたときに通知を受け取ることができます。また、S3 イベント通知を設定して、失敗したバッチに対して別の Lambda 関数を呼び出し、自動処理を実行することもできます。
実行ロールには、送信先に対するアクセス許可が必要です。
-
SQS 送信先の場合: sqs:SendMessage
-
SNS 送信先の場合: sns:Publish
-
S3 バケット送信先の場合: s3:PutObject および s3:ListBucket
障害発生時の送信先サービスの VPC エンドポイントを Amazon MSK クラスター VPC 内にデプロイする必要があります。
さらに、送信先に KMS キーを設定した場合、Lambda には送信先のタイプに応じて以下のアクセス許可が必要です。
-
S3 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:GenerateDataKey が必要です。KMS キーと S3 バケットの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、kms:GenerateDataKey を許可するように実行ロールを信頼するように KMS キーを設定します。
-
SQS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:Decrypt および kms:GenerateDataKey が必要です。KMS キーと SQS キューの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、kms:DescribeKey、および kms:ReEncrypt を許可するように設定します。
-
SNS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、kms:Decrypt と kms:GenerateDataKey が必要です。KMS キーと SNS トピックの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、kms:DescribeKey、および kms:ReEncrypt を許可するように設定します。
Amazon MSK イベントソースマッピングの障害発生時の送信先の設定
障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。
Lambda コンソールの [関数ページ]
を開きます。 -
関数を選択します。
-
[機能の概要 ] で、[送信先を追加 ] を選択します。
-
[ソース] には、[イベントソースマッピング呼び出し] を選択します。
-
[イベントソースマッピング] では、この関数用に設定されているイベントソースを選択します。
-
[条件] には [失敗時] を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。
-
[送信先タイプ] では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。
-
[送信先] で、リソースを選択します。
-
[Save] を選択します。
AWS CLI を使用して障害発生時の送信先を設定することもできます。例えば、次の create-event-source-mappingMyFunction
に追加します。
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
以下の update-event-source-mappinguuid
に関連付けられたイベントソースに追加します。
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
送信先を削除するには、destination-config
パラメータの引数として空の文字列を指定します。
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
Amazon S3 送信先のセキュリティのベストプラクティス
関数の設定から送信先を削除せずに、送信先として設定された S3 バケットを削除すると、セキュリティリスクが発生する可能性があります。別のユーザーが送信先バケットの名前を知っている場合は、その AWS アカウントでバケットを再作成できます。失敗した呼び出しのレコードがそのバケットに送信され、関数からのデータが公開される可能性があります。
警告
関数からの呼び出しレコードを別の AWS アカウントの S3 バケットに送信できないようにするには、s3:PutObject
アクセス許可を自分アカウントのバケットに制限する条件を関数の実行ロールに追加します。
次の例は、関数の s3:PutObject
アクセス許可を自分のアカウントのバケットに制限する IAM ポリシーを示しています。このポリシーは、送信先として S3 バケットを使用するために必要な s3:ListBucket
アクセス許可も Lambda に付与します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount":
"111122223333"
} } } ] }
AWS Management Consoleまたは AWS CLI を使用して、関数の実行ロールにアクセス許可ポリシーを追加する場合は、次の手順を参照してください。
SNS および SQS の呼び出しレコードの例
以下の例は、Kafka イベントソース呼び出しが失敗した場合に Lambda が SNS トピックまたは SQS キューの送信先に送信する内容を示しています。recordsInfo
の各キーには、Kafka トピックとパーティションの両方がハイフンで区切られて含まれています。例えば、キー "Topic-0"
の場合、Topic
は Kafka トピック、0
はパーティションです。各トピックとパーティションについて、オフセットとタイムスタンプデータを使用して元の呼び出しレコードを検索できます。
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }
S3 送信先の呼び出しレコードの例
S3 の送信先の場合、Lambda は呼び出しレコード全体をメタデータと共に送信先に送信します。以下の例は、Kafka イベントソース呼び出しが失敗した場合に、Lambda が S3 バケットの送信先に送信することを示しています。SQS と SNS の送信先に関する前例のすべてのフィールドに加えて、payload
フィールドには元の呼び出しレコードがエスケープされた JSON 文字列として含まれています。
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
ヒント
送信先バケットで S3 バージョニングを有効にすることをお勧めします。