

# Amazon MSK とセルフマネージド Apache Kafka イベントソースの破棄されたバッチのキャプチャ
<a name="kafka-on-failure"></a>

失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。送信先に送られる各レコードは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。Amazon S3 送信先の場合、Lambda はメタデータと共に呼び出しレコード全体を送信します。任意の Amazon SNS トピック、Amazon SQS キュー、Amazon S3 バケット、または Kafka を送信先として設定できます。

Amazon S3 送信先を使用すると、[Amazon S3 イベント通知](https://docs.aws.amazon.com/)機能を使用して、オブジェクトが送信先 S3 バケットにアップロードされたときに通知を受け取ることができます。また、S3 イベント通知を設定して、失敗したバッチに対して別の Lambda 関数を呼び出し、自動処理を実行することもできます。

実行ロールには、送信先に対するアクセス許可が必要です。
+ **SQS 送信先の場合:** [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **SNS 送信先の場合:** [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **S3 送信先の場合:** [s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) および [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Kafka 送信先の場合:** [kafka-cluster:WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

Kafka トピックは、Kafka イベントソースマッピングの障害発生時の送信先として設定できます。再試行回数を超えた後、またはレコードが最大経過時間を超えた後に Lambda がレコードを処理できなくなると、Lambda は失敗したレコードを指定された Kafka トピックに送信して、後で処理します。「[Kafka トピックを障害発生時の送信先として使用する](kafka-on-failure-destination.md)」を参照してください。

障害発生時の送信先サービスの VPC エンドポイントを Kafka クラスター VPC 内にデプロイする必要があります。

さらに、送信先に KMS キーを設定した場合、Lambda には送信先のタイプに応じて以下のアクセス許可が必要です。
+ S3 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、[kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) が必要です。KMS キーと S3 バケットの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、kms:GenerateDataKey を許可するように実行ロールを信頼するように KMS キーを設定します。
+ SQS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、[kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) および [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) が必要です。KMS キーと SQS キューの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、[kms:DescribeKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_DescribeKey.html)、および [kms:ReEncrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_ReEncrypt.html) を許可するように設定します。
+ SNS 送信先に対して独自の KMS キーによる暗号化を有効にしている場合は、[kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) と [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) が必要です。KMS キーと SNS トピックの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、KMS キーが実行ロールを信頼し、kms:Decryp、kms:GenerateDataKey、[kms:DescribeKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_DescribeKey.html)、および [kms:ReEncrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_ReEncrypt.html) を許可するように設定します。

## Kafka イベントソースマッピングの障害発生時の送信先の設定
<a name="kafka-onfailure-destination"></a>

障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。

1. Lambda コンソールの [[関数ページ]](https://console.aws.amazon.com/lambda/home#/functions) を開きます。

1. 関数を選択します。

1. [**機能の概要 **] で、[**送信先を追加 **] を選択します。

1. **[ソース]** には、**[イベントソースマッピング呼び出し]** を選択します。

1. **[イベントソースマッピング]** では、この関数用に設定されているイベントソースを選択します。

1. **[条件]** には **[失敗時]** を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。

1. **[送信先タイプ]** では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。

1. [**送信先**] で、リソースを選択します。

1. **[保存]** を選択します。

AWS CLI を使用して障害発生時の送信先を設定することもできます。例えば、次の [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) コマンドは、SQS を障害発生時の送信先として持つイベントソースマッピングを `MyFunction` に追加します。

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

以下の [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) コマンドは、S3 の障害発生時の送信先を、入力 `uuid` に関連付けられたイベントソースに追加します。

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
```

送信先を削除するには、`destination-config` パラメータの引数として空の文字列を指定します。

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Amazon S3 送信先のセキュリティのベストプラクティス
<a name="kafka-s3-destination-security"></a>

関数の設定から送信先を削除せずに、送信先として設定された S3 バケットを削除すると、セキュリティリスクが発生する可能性があります。別のユーザーが送信先バケットの名前を知っている場合は、その AWS アカウントでバケットを再作成できます。失敗した呼び出しのレコードがそのバケットに送信され、関数からのデータが公開される可能性があります。

**警告**  
関数からの呼び出しレコードを別の AWS アカウントの S3 バケットに送信できないようにするには、`s3:PutObject` アクセス許可を自分アカウントのバケットに制限する条件を関数の実行ロールに追加します。

次の例は、関数の `s3:PutObject` アクセス許可を自分のアカウントのバケットに制限する IAM ポリシーを示しています。このポリシーは、送信先として S3 バケットを使用するために必要な `s3:ListBucket` アクセス許可も Lambda に付与します。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": {{"111122223333"}}
                }
            }
        }
    ]
}
```

AWS マネジメントコンソールまたは AWS CLI を使用して、関数の実行ロールにアクセス許可ポリシーを追加する場合は、次の手順を参照してください。

------
#### [ Console ]

**関数の実行ロールにアクセス許可ポリシーを追加するには (コンソール)**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開きます。

1. 実行ロールを変更する Lambda 関数を選択します。

1. **[構成]** タブで、**[アクセス許可]** を選択します。

1. **[実行ロール]** タブで、関数の **[ロール名]** を選択して、ロールの IAM コンソールページを開きます。

1. 次の手順を実行してアクセス許可ポリシーをロールに追加します。

   1. **[アクセス許可ポリシー]** ペインで、**[アクセス許可の追加]**、**[インラインポリシーを作成]** を選択します。

   1. **ポリシーエディタ**で、**[JSON]** を選択します。

   1. 追加するポリシーをエディタに貼り付け (既存の JSON を置き換える)、**[次へ]** を選択します。

   1. **[ポリシーの詳細]** で **[ポリシー名]** を入力します。

   1. [**Create policy**] (ポリシーの作成) を選択します。

------
#### [ AWS CLI ]

**関数の実行ロールにアクセス許可ポリシーを追加するには (CLI)**

1. 必要なアクセス許可を持つ JSON ポリシードキュメントを作成し、ローカルディレクトリに保存します。

1. IAM `put-role-policy` CLI コマンドを使用して、関数の実行ロールにアクセス許可を追加します。JSON ポリシードキュメントを保存したディレクトリから次のコマンドを実行して、ロール名、ポリシー名、ポリシードキュメントを独自の値に置き換えます。

   ```
   aws iam put-role-policy \
   --role-name {{my_lambda_role}} \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://{{my_policy.json}}
   ```

------

### SNS および SQS の呼び出しレコードの例
<a name="kafka-sns-sqs-destinations"></a>

以下の例は、Kafka イベントソース呼び出しが失敗した場合に Lambda が SNS トピックまたは SQS キューの送信先に送信する内容を示しています。`recordsInfo` の各キーには、Kafka トピックとパーティションの両方がハイフンで区切られて含まれています。例えば、キー `"Topic-0"` の場合、`Topic` は Kafka トピック、`0` はパーティションです。各トピックとパーティションについて、オフセットとタイムスタンプデータを使用して元の呼び出しレコードを検索できます。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
        "approximateInvokeCount": 1
    },
    "responseContext": { // null if record is MaximumPayloadSizeExceeded
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KafkaBatchInfo": {
        "batchSize": 500,
        "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
        "bootstrapServers": "...",
        "payloadSize": 2039086, // In bytes
        "recordsInfo": {
            "Topic-0": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            },
            "Topic-1": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            }
        }
    }
}
```

### S3 送信先の呼び出しレコードの例
<a name="kafka-s3-destinations"></a>

S3 の送信先の場合、Lambda は呼び出しレコード全体をメタデータと共に送信先に送信します。以下の例は、Kafka イベントソース呼び出しが失敗した場合に、Lambda が S3 バケットの送信先に送信することを示しています。SQS と SNS の送信先に関する前例のすべてのフィールドに加えて、`payload` フィールドには元の呼び出しレコードがエスケープされた JSON 文字列として含まれています。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
        "approximateInvokeCount": 1
    },
    "responseContext": { // null if record is MaximumPayloadSizeExceeded
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KafkaBatchInfo": {
        "batchSize": 500,
        "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
        "bootstrapServers": "...",
        "payloadSize": 2039086, // In bytes
        "recordsInfo": {
            "Topic-0": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            },
            "Topic-1": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            }
        }
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

**ヒント**  
送信先バケットで S3 バージョニングを有効にすることをお勧めします。