

# セルフマネージド型の Apache Kafka で Lambda を使用する
<a name="with-kafka"></a>

このトピックでは、セルフマネージド型の Kafka クラスターで Lambda を使用する方法を説明します。AWS の用語集では、セルフマネージド型クラスターには、非 AWS のホストされた Kafka クラスターが含まれています。例えば、お使いの Kafka クラスターを、[Confluent Cloud](https://www.confluent.io/confluent-cloud/) または [Redpanda](https://www.redpanda.com/) などのクラウドプロバイダーでホストすることが可能です。

この章では、セルフマネージド Apache Kafka クラスターを Lambda 関数のイベントソースとして使用する方法について説明します。セルフマネージド Apache Kafka と Lambda を統合する一般的なプロセスには、次の手順が含まれます。

1. **[クラスターとネットワークのセットアップ](with-kafka-cluster-network.md)** – まず、Lambda がクラスターにアクセスできるように、適切なネットワーク設定でセルフマネージド Apache Kafka クラスターを設定します。

1. **[イベントソースマッピングのセットアップ](with-kafka-configure.md)** – 次に、Apache Kafka クラスターを関数に安全に接続するために Lambda が必要とする[イベントソースマッピング](invocation-eventsourcemapping.md)リソースを作成します。

1. **[関数とアクセス許可のセットアップ](with-kafka-permissions.md)** – 最後に、関数を正しくセットアップし、[実行ロール](lambda-intro-execution-role.md)に必要なアクセス許可を付与します。

イベントソースとしての Apache Kafka は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。Lambda は、イベントソースからの新しいメッセージを内部的にポーリングした後、ターゲットの Lambda 関数を同期的に呼び出します。Lambda はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは設定可能です (デフォルトでは 100 メッセージ)。詳細については、「[バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)」を参照してください。

セルフマネージド Apache Kafka イベントソースマッピングのスループットを最適化するには、プロビジョンドモードを設定します。プロビジョンドモードでは、イベントソースマッピングに割り当てられるイベントポーラーの最小数と最大数を定義できます。これにより、イベントソースマッピングが予期しないメッセージスパイクを処理する能力を向上させることができます。詳細については、「[プロビジョンドモード](kafka-scaling-modes.md#kafka-provisioned-mode)」を参照してください。

**警告**  
Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、AWS ナレッジセンターの「[Lambda 関数を冪等にするにはどうすればよいですか?](https://repost.aws/knowledge-center/lambda-function-idempotent)」を参照してください。

Kafka ベースのイベントソースの場合、Lambda はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。詳しくは、「[バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)」を参照してください。

セルフマネージド型 Kafka をイベントソースとして使用する方法の例については、AWS Compute Blog の [Using self-hosted Apache Kafka as an event source for AWS Lambda](https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/) を参照してください。

**Topics**
+ [イベントの例](#smaa-sample-event)
+ [Lambda 用のセルフマネージド Apache Kafka クラスターとネットワークの設定](with-kafka-cluster-network.md)
+ [Lambda 実行ロールのアクセス許可の設定](with-kafka-permissions.md)
+ [Lambda 用のセルフマネージド Apache Kafka をイベントソースの設定](with-kafka-configure.md)

## イベントの例
<a name="smaa-sample-event"></a>

Lambda は、Lambda 関数を呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Kafka トピックと Kafka パーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

```
{
   "eventSource": "SelfManagedKafka",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

# Lambda 用のセルフマネージド Apache Kafka クラスターとネットワークの設定
<a name="with-kafka-cluster-network"></a>

Lambda 関数をセルフマネージド Apache Kafka クラスターに接続するには、クラスターとクラスターが存在するネットワークを正しく設定する必要があります。このページでは、クラスターおよびネットワークを設定する方法について説明します。クラスターおよびネットワークが既に正しく設定されている場合、「[Lambda 用のセルフマネージド Apache Kafka をイベントソースの設定](with-kafka-configure.md)」を参照してイベントソースマッピングを設定します。

**Topics**
+ [セルフマネージド Apache Kafka クラスターのセットアップ](#kafka-cluster-setup)
+ [ネットワークセキュリティを設定する](#services-kafka-vpc-config)

## セルフマネージド Apache Kafka クラスターのセットアップ
<a name="kafka-cluster-setup"></a>

[Confluent Cloud](https://www.confluent.io/confluent-cloud/) や [Redpanda](https://www.redpanda.com/) などのクラウドプロバイダーでセルフマネージド Apache Kafka クラスターをホストすることも、独自のインフラストラクチャで実行することもできます。クラスターが正しく設定され、Lambda イベントソースマッピングが接続するネットワークからアクセス可能であることを確認します。

## ネットワークセキュリティを設定する
<a name="services-kafka-vpc-config"></a>

イベントソースマッピングを通じて Lambda にセルフマネージド Apache Kafka へのフルアクセスを許可するには、クラスターがパブリックエンドポイント (パブリック IP アドレス) を使用するか、クラスターを作成した Amazon VPC へのアクセスを提供する必要があります。

Lambda でセルフマネージド Apache Kafka を使用する場合は、関数に Amazon VPC 内のリソースへのアクセスを付与する [AWS PrivateLink VPC エンドポイント](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html)を作成します。

**注記**  
イベントポーラーにデフォルト (オンデマンド) モードを使用するイベントソースマッピングを持つ関数には、AWS PrivateLink VPC エンドポイントが必要です。イベントソースマッピングが[プロビジョンドモード](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)を使用している場合は、AWS PrivateLink VPC エンドポイントを設定する必要はありません。

エンドポイントを作成して、次のリソースへのアクセスを提供します。
+  Lambda — Lambda サービスプリンシパルのエンドポイントを作成します。
+  AWS STS — サービスプリンシパルがユーザーに代わってロールを引き受けるために、AWS STS のエンドポイントを作成します。
+  Secrets Manager — クラスターが Secrets Manager を使用して認証情報を保存する場合は、Secrets Manager のエンドポイントを作成します。

または、Amazon VPC の各パブリックサブネットに NAT ゲートウェイを設定します。詳細については、「[VPC に接続された Lambda 関数にインターネットアクセスを有効にする](configuration-vpc-internet.md)」を参照してください。

セルフマネージド Apache Kafka のイベントソースマッピングを作成すると、Lambda は Amazon VPC に設定されたサブネットおよびセキュリティグループに Elastic Network Interface (ENI) が既に存在するかどうかを確認します。Lambda が既存の ENI を検出した場合、再利用しようとします。それ以外の場合、Lambda は新しい ENI を作成し、イベントソースに接続して関数を呼び出します。

**注記**  
Lambda 関数は、Lambda サービスが所有する VPC 内で常に実行されます。関数の VPC 設定はイベントソースマッピングに影響しません。Lambda がイベントソースに接続する方法を判定するのは、イベントソースのネットワーク設定のみです。

クラスターを含む Amazon VPC のセキュリティグループを設定します。デフォルトでは、セルフマネージド Apache Kafka はポート: `9092` を使用します。
+ インバウンドルール – イベントソースに関連付けられたセキュリティグループに対してデフォルトのブローカーポート上のすべてのトラフィックを許可します。または、自己参照セキュリティグループルールを使用して、同じセキュリティグループ内のインスタンスからのアクセスを許可することもできます。
+ アウトバウンドルール – 関数が AWS サービスと通信する必要がある場合、外部送信先のポート `443` 上のすべてのトラフィックを許可します。または、自己参照セキュリティグループルールを使用して、他の AWS サービスと通信する必要がない場合は、ブローカーへのアクセスを制限することもできます。
+ Amazon VPC エンドポイントのインバウンドルール – Amazon VPC エンドポイントを使用している場合、Amazon VPC エンドポイントに関連付けられたセキュリティグループは、クラスターセキュリティグループからポート `443` でインバウンドトラフィックを許可する必要があります。

クラスターが認証を使用する場合、Secrets Manager エンドポイントのエンドポイントポリシーを制限することもできます。Secrets Manager API を呼び出す場合、Lambda は Lambda サービスプリンシパルではなく、関数ロールを使用します。

**Example VPC エンドポイントポリシー – Secrets Manager エンドポイント**  

```
{
      "Statement": [
          {
              "Action": "secretsmanager:GetSecretValue",
              "Effect": "Allow",
              "Principal": {
                  "AWS": [
                      "arn:aws::iam::123456789012:role/my-role"
                  ]
              },
              "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
          }
      ]
  }
```

Amazon VPC エンドポイントを使用する場合、AWS は API コールをルーティングし、エンドポイントの Elastic Network Interface (ENI) を使用して関数を呼び出します。Lambda サービスプリンシパルは、これらの ENI を使用するすべてのロールおよび関数に対して `lambda:InvokeFunction` を呼び出す必要があります。

デフォルトでは、Amazon VPC エンドポイントには、リソースへの広範なアクセスを許可するオープンな IAM ポリシーが適用されています。そのエンドポイントを使用して必要なアクションを実行するためのベストプラクティスは、これらのポリシーを制限することです。イベントソースマッピングが Lambda 関数を呼び出せるようにするには、VPC エンドポイントポリシーで、Lambda サービスプリンシパルが `sts:AssumeRole` および `lambda:InvokeFunction` を呼び出すことを許可する必要があります。組織内で発生する API コールのみを許可するように VPC エンドポイントポリシーを制限すると、イベントソースマッピングが正しく機能しなくなるため、これらのポリシーには `"Resource": "*"` が必要です。

次の VPC エンドポイントポリシーの例では、AWS STS および Lambda エンドポイントに Lambda サービスプリンシパルの必要なアクセスを付与する方法について示しています。

**Example VPC エンドポイントポリシー - AWS STS エンドポイント**  

```
{
      "Statement": [
          {
              "Action": "sts:AssumeRole",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
    }
```

**Example VPC エンドポイントポリシー – Lambda エンドポイント**  

```
{
      "Statement": [
          {
              "Action": "lambda:InvokeFunction",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
  }
```

# Lambda 実行ロールのアクセス許可の設定
<a name="with-kafka-permissions"></a>

[セルフマネージド Kafka クラスターへのアクセス](kafka-cluster-auth.md)に加えて、Lambda 関数にはさまざまな API アクションを実行するための許可が必要です。これらの許可は、関数の[実行ロール](lambda-intro-execution-role.md)に追加します。ユーザーが API アクションのいずれかにアクセスする必要がある場合は、AWS Identity and Access Management (IAM) ユーザーまたはロールのアイデンティティポリシーに必要な許可を追加します。

**Topics**
+ [Lambda 関数に必要なアクセス許可](#smaa-api-actions-required)
+ [Lambda 関数のオプションのアクセス許可](#smaa-api-actions-optional)
+ [実行ロールへのアクセス許可の追加](#smaa-permissions-add-policy)
+ [IAM ポリシーを使用したユーザーアクセスの許可](#smaa-permissions-add-users)

## Lambda 関数に必要なアクセス許可
<a name="smaa-api-actions-required"></a>

Amazon CloudWatch Logs のロググループでログを作成して保存するには、Lambda 関数の実行ロールに以下の許可が必要です。
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [logs:PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

## Lambda 関数のオプションのアクセス許可
<a name="smaa-api-actions-optional"></a>

Lambda 関数には、以下を実行する許可も必要になる場合があります。
+ Secrets Manager シークレットを記述する。
+ AWS Key Management Service (AWS KMS) カスタマー管理のキーにアクセスする。
+ Amazon VPC にアクセスする。
+ 失敗した呼び出しのレコードを送信先に送信します。

### Secrets Manager と AWS KMS 許可
<a name="smaa-api-actions-secrets"></a>

Kafka ブローカーに設定しているアクセスコントロールのタイプに応じて、Lambda 関数には Secrets Manager シークレットにアクセスするための許可、または AWS KMS カスタマーマネージドキーを復号化するための許可が必要になる場合があります。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html)
+ [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html)

### VPC アクセス許可
<a name="smaa-api-actions-vpc"></a>

セルフマネージド Apache Kafka クラスターにアクセスできるのが VPC 内のユーザーのみである場合、Lambda 関数には Amazon VPC リソースにアクセスするための許可が必要です。これらのリソースには、VPC、サブネット、セキュリティグループ、ネットワークインターフェイスが含まれます。それらのリソースにアクセスするには、関数の実行ロールに次のアクセス許可が必要です。
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [ ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)

## 実行ロールへのアクセス許可の追加
<a name="smaa-permissions-add-policy"></a>

セルフマネージド型 Apache Kafka クラスターが使用するその他の AWS サービスにアクセスするために、Lambda は、関数の[実行ロール](lambda-intro-execution-role.md)で定義されたアクセス許可ポリシーを使用します。

デフォルトでは、Lambda は、セルフマネージド型 Apache Kafka クラスターに対して、必須のまたはオプションのアクションを実行することはできません。こうしたアクションは、実行ロールの [IAM 信頼ポリシー](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_update-role-trust-policy.html)で作成および定義する必要があります。この例では、Lambda に Amazon VPC リソースへのアクセスを許可する、ポリシーの作成方法を紹介します。

------
#### [ JSON ]

****  

```
{
        "Version":"2012-10-17",		 	 	 
        "Statement":[
           {
              "Effect":"Allow",
              "Action":[
                 "ec2:CreateNetworkInterface",
                 "ec2:DescribeNetworkInterfaces",
                 "ec2:DescribeVpcs",
                 "ec2:DeleteNetworkInterface",
                 "ec2:DescribeSubnets",
                 "ec2:DescribeSecurityGroups"
              ],
              "Resource":"*"
           }
        ]
     }
```

------

## IAM ポリシーを使用したユーザーアクセスの許可
<a name="smaa-permissions-add-users"></a>

デフォルトでは、ユーザーおよびロールには[イベントソースの API オペレーション](invocation-eventsourcemapping.md#event-source-mapping-api)を実行するアクセス許可がありません。組織またはアカウント内のユーザーにアクセス権を付与するには、アイデンティティベースのポリシーを作成または更新します。詳細については、「*IAM ユーザーガイド*」の「[ポリシーを使用した AWS リソースへのアクセス制御](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_controlling.html)」を参照してください。

認証および承認エラーをトラブルシューティングするには、「[セルフマネージド Kafka イベントソースマッピングエラーのトラブルシューティング](with-kafka-troubleshoot.md)」を参照してください。

# Lambda 用のセルフマネージド Apache Kafka をイベントソースの設定
<a name="with-kafka-configure"></a>

セルフマネージド Apache Kafka クラスターを Lambda 関数のイベントソースとして使用するには、2 つのリソースを接続する[イベントソースマッピング](invocation-eventsourcemapping.md)を作成します。このページでは、セルフマネージド Apache Kafka 用にイベントソースマッピングを作成する方法について説明します。

このページの説明は、Kafka クラスターおよびそのクラスターが存在するネットワークを正しく設定済みであることを前提としています。クラスターまたはネットワークをセットアップする必要がある場合は、「[Lambda 用のセルフマネージド Apache Kafka クラスターとネットワークの設定](with-kafka-cluster-network.md)」を参照してください。

**Topics**
+ [イベントソースとしてセルフマネージド Apache Kafka クラスターを使用する](#kafka-esm-overview)
+ [Lambda でのクラスターの認証方法の設定](kafka-cluster-auth.md)
+ [セルフマネージド Apache Kafka イベントソース用に Lambda イベントソースマッピングを作成する](kafka-esm-create.md)
+ [Lambda の全セルフマネージド Apache Kafka イベントソース設定パラメータ](kafka-esm-parameters.md)

## イベントソースとしてセルフマネージド Apache Kafka クラスターを使用する
<a name="kafka-esm-overview"></a>

Apache Kafka クラスターまたは Amazon MSK クラスターを Lambda 関数のトリガーとして追加すると、クラスターは[イベントソース](invocation-eventsourcemapping.md)として使用されます。

Lambda は、ユーザーが指定した[開始位置](kafka-starting-positions.md)に基づいて、[CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) リクエストで `Topics` として指定した Kafka トピックからイベントデータを読み取ります。処理が成功すると、Kafka トピックは Kafka クラスターにコミットされます。

Lambda は、Kafka トピックの各パーティションのメッセージを順番に読み込みます。1 つの Lambda ペイロードに、複数のパーティションからのメッセージを含めることができます。利用可能なレコードが増えると、Lambda は [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) リクエストで指定した BatchSize 値に基づいて、関数がトピックに追いつくまでバッチ単位でレコードの処理を継続します。

Lambda は各バッチを処理した後、そのバッチ内のメッセージのオフセットをコミットします。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。すべての再試行が失敗したレコードを、障害発生時の送信先に送信して、後で処理することができます。

**注記**  
Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。

# Lambda でのクラスターの認証方法の設定
<a name="kafka-cluster-auth"></a>

Lambda は、セルフマネージド型 Apache Kafka クラスターで認証するための方法をいくつかサポートしています。これらのサポートされる認証方法のいずれかを使用するように、Kafka クラスターを設定しておいてください。Kafka セキュリティの詳細については、Kafka ドキュメントの「[セキュリティ](http://kafka.apache.org/documentation.html#security)」セクションを参照してください。

## SASL/SCRAM 認証
<a name="smaa-auth-sasl"></a>

Lambda は、Transport Layer Security (TLS) 暗号化 (`SASL_SSL`) を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。Lambda は、暗号化された認証情報を送信してクラスターで認証します。Lambda は plaintext の SASL/PLAIN (`SASL_PLAINTEXT`) をサポートしません。SASL/SCRAM 認証の詳細については、「[RFC 5802](https://tools.ietf.org/html/rfc5802)」を参照してください。

Lambda は SASL/PLAIN 認証もサポートします。このメカニズムはクリアテキスト認証情報を使用するので、この認証情報が保護されることを確実にするためにも、サーバーへの接続には TLS 暗号化を使用する必要があります。

SASL 認証の場合は、サインイン認証情報をシークレットとして AWS Secrets Manager に保存します。Secrets Manager の使用の詳細については、「*AWS Secrets Manager ユーザーガイド*」の「[AWS Secrets Manager シークレットの作成](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html)」を参照してください。

**重要**  
認証に Secrets Manager を使用するには、シークレットを Lambda 関数と同じ AWS リージョンに保存する必要があります。

## 相互 TLS 認証
<a name="smaa-auth-mtls"></a>

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

セルフマネージド Apache Kafka では、Lambda がクライアントとして機能します。Kafka ブローカーで Lambda を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、サーバーのトラストストア内の CA によって署名される必要があります。

Kafka クラスターは、Lambda で Kafka ブローカーを認証するために Lambda にサーバー証明書を送信します。サーバー証明書は、パブリック CA 証明書またはプライベート CA/自己署名証明書にすることができます。パブリック CA 証明書は、Lambda トラストストア内の認証局 (CA) によって署名される必要があります。プライベート CA/自己署名証明書の場合は、サーバルート CA 証明書を (Secrets Manager のシークレットとして) 設定します。Lambda はルート証明書を使用して Kafka ブローカーを検証します。

mTLS の詳細については、「[Introducing mutual TLS authentication for Amazon MSK as an event source](https://aws.amazon.com/blogs/compute/introducing-mutual-tls-authentication-for-amazon-msk-as-an-event-source)」(イベントソースとしての Amazon MSK のための相互 TLS の紹介) を参照してください。

## クライアント証明書シークレットの設定
<a name="smaa-auth-secret"></a>

CLIENT\$1CERTICATE\$1TLS\$1AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

**注記**  
Lambda は、[PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

```
-----BEGIN CERTIFICATE-----  
            <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) 形式にする必要があります。

```
-----BEGIN PRIVATE KEY-----  
             <private key contents>
-----END PRIVATE KEY-----
```

暗号化されたプライベートキーには、以下の構造を使用します。

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
              <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

```
{"privateKeyPassword":"testpassword",
"certificate":"-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
"privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

## サーバルート CA 証明書シークレットの設定
<a name="smaa-auth-ca-cert"></a>

このシークレットは、Kafka ブローカーがプライベート CA によって署名された証明書で TLS 暗号化を使用する場合に作成します。TLS 暗号化は、VPC、SASL/SCRAM、SASL/PLAIN、または mTLS 認証に使用できます。

サーバールート CA 証明書シークレットには、PEM 形式の Kafka ブローカーのルート CA 証明書が含まれるフィールドが必要です。以下は、このシークレットの構造を示す例です。

```
{"certificate":"-----BEGIN CERTIFICATE-----
MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx
EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT
HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs
ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG...
-----END CERTIFICATE-----"
}
```

# セルフマネージド Apache Kafka イベントソース用に Lambda イベントソースマッピングを作成する
<a name="kafka-esm-create"></a>

イベントソースマッピングの作成には、Lambda コンソール、[AWS Command Line Interface (CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)、[AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/) のいずれかを使用できます。

次のコンソールの手順で、Lambda 関数のトリガーとしてセルフマネージド Apache Kafka クラスターを追加します。内部でイベントソースマッピングリソースが作成されます。

## 前提条件
<a name="kafka-esm-prereqs"></a>
+ セルフマネージド型 Apache Kafka クラスター。Lambda は、Apache Kafka バージョン 0.10.1.0 以降をサポートしています。
+ セルフマネージド Kafka クラスターが使用する AWS リソースにアクセスするための許可を持つ[実行ロール](lambda-intro-execution-role.md)。

## セルフマネージド型 Kafka クラスターを追加する (コンソール)
<a name="kafka-esm-console"></a>

セルフマネージド型 Apache Kafka クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。

**Apache Kafka トリガーを Lambda 関数に追加するには (コンソール)**

1. Lambda コンソールの [[Functions] (関数) ページ](https://console.aws.amazon.com/lambda/home#/functions)を開きます。

1. Lambda 関数の名前を選択します。

1. [**機能の概要**] で、[**トリガーを追加**] を選択します。

1. [**Trigger configuration**] (トリガー設定) で次の操作を実行します。

   1. **Apache Kafka** のトリガータイプを選択します。

   1. [**Bootstrap servers**] (ブートストラップサーバー) に、クラスター内の Kafka ブローカーのホストおよびポートのペアアドレスを入力し、[**Add**] (追加) を選択します。クラスター内の各 Kafka ブローカーで上記を繰り返します。

   1. [**Topic name**] (トピック名) に、クラスター内のレコードの保存に使用する Kafka トピックの名前を入力します。

   1. プロビジョンドモードを設定する場合は、**[最小イベントポーラー]** の値、**[最大イベントポーラー]** の値、PollerGroupName のオプションの値を入力して、同じイベントソース VPC 内の複数の ESM のグループ化を指定します。

   1. (オプション) [**Batch size**] (バッチサイズ) に、単一のバッチで取得できるメッセージの最大数を入力します。

   1. **バッチウィンドウ** では、Lambda が関数を呼び出すまで費やすレコード収集の最大時間 (秒) を入力します。

   1. (オプション) **コンシューマーグループ ID** で、参加する Kafka コンシューマーグループの ID を入力します。

   1. (オプション) **[開始位置]** で、**[最新]** を選択して最新のレコードからストリームの読み取りを開始するか、**[水平トリム]** を選択して使用可能な最も以前のレコードから開始するか、または **[タイムスタンプ時点]** を選択して読み取りを開始するタイムスタンプを指定します。

   1. (オプション) **[VPC]** で、Kafka クラスターに Amazon VPC を選択します。次に、**[VPC subnets]** (VPC サブネット) と **[VPC security groups]** (VPC セキュリティグループ) を選択します。

      VPC 内のユーザーのみがブローカーにアクセスする場合、この設定は必須です。

      

   1. (オプション) **[Authentication]** (認証) で **[Add]** (追加) をクリックしてから、以下を実行します。

      1. クラスター内の Kafka ブローカーのアクセスまたは認証プロトコルを選択します。
         + Kafka ブローカーが SASL/PLAIN 認証を使用する場合は、**[BASIC\$1AUTH]** を選択します。
         + ブローカーが SASL/SCRAM 認証を使用する場合は、**[SASL\$1SCRAM]** プロトコルのいずれかを選択します。
         + mTLS 認証を設定している場合は、**[CLIENT\$1CERTIFICATE\$1TLS\$1AUTH]** プロトコルを選択します。

      1. SASL/SCRAM または mTLS 認証の場合は、Kafka クラスターの認証情報が含まれる Secrets Manager シークレットキーを選択します。

   1. (オプション) Kafka ブローカーがプライベート CA によって署名された証明書を使用する場合、**[Encryption]** (暗号化) には Kafka ブローカーが TLS 暗号化に使用するルート CA 証明書が含まれる Secrets Manager シークレットを選択します。

      この設定は、SASL/SCRAM または SASL/PLAIN の TLS 暗号化、および mTLS 認証に適用されます。

   1. テスト用に無効状態のトリガーを作成する (推奨) には、[**Enable trigger**] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[**Enable trigger**] (トリガーを有効にする) を選択します。

1. トリガーを追加するには、[**Add**] (追加) を選択します。

## セルフマネージド型 Kafka クラスターを追加する (AWS CLI)
<a name="kafka-esm-cli"></a>

Lambda 関数のセルフマネージド型 Apache Kafka トリガーを作成および表示するには、次の AWS CLI コマンドの例を使用します。

### SASL/SCRAM を使用する
<a name="kafka-esm-cli-create"></a>

Kafka ユーザーがインターネット経由で Kafka ブローカーにアクセスする場合は、SASL/SCRAM 認証用に作成した Secrets Manager シークレットを指定します。次の例では、[create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) AWS CLI コマンドを使用して、`my-kafka-function` という名前の Lambda 関数を `AWSKafkaTopic` という名前の Kafka トピックにマッピングします。

```
aws lambda create-event-source-mapping \ 
  --topics AWSKafkaTopic \
  --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \
  --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
```

### VPC の使用
<a name="kafka-esm-cli-create-vpc"></a>

Kafka ブローカーにアクセスするのが VPC 内の Kafka ユーザーのみである場合、VPC、サブネット、および VPC セキュリティグループを指定する必要があります。次の例では、[create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) AWS CLI コマンドを使用して、`my-kafka-function` という名前の Lambda 関数を `AWSKafkaTopic` という名前の Kafka トピックにマッピングします。

```
aws lambda create-event-source-mapping \ 
  --topics AWSKafkaTopic \
  --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \
  --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
```

### AWS CLI を使用したステータスの表示
<a name="kafka-esm-cli-view"></a>

次の例では、[get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) AWS CLI コマンドを使用して、作成したイベントソースマッピングのステータスを記述します。

```
aws lambda get-event-source-mapping
              --uuid dh38738e-992b-343a-1077-3478934hjkfd7
```

# Lambda の全セルフマネージド Apache Kafka イベントソース設定パラメータ
<a name="kafka-esm-parameters"></a>

すべての Lambda イベントソースタイプは、同じ [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) および [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API オペレーションを共有しています。ただし、次のテーブルで示されるように、セルフマネージド Apache Kafka に適用されるのは一部のパラメータのみです。


| [Parameter] (パラメータ) | 必須 | デフォルト | メモ | 
| --- | --- | --- | --- | 
|  BatchSize  |  N  |  100  |  最大: 10,000  | 
|  DestinationConfig  |  いいえ  |  該当なし  |  [Amazon MSK とセルフマネージド Apache Kafka イベントソースの破棄されたバッチのキャプチャ](kafka-on-failure.md)  | 
|  有効  |  いいえ  |  正  |  | 
|  FilterCriteria  |  いいえ  |  該当なし  |  [Lambda が関数に送信するイベントを制御する](invocation-eventfiltering.md)  | 
|  FunctionName  |  はい  |  該当なし  |    | 
|  KMSKeyArn  |  いいえ  |  該当なし  |  [フィルター条件の暗号化](invocation-eventfiltering.md#filter-criteria-encryption)  | 
|  MaximumBatchingWindowInSeconds  |  いいえ  |  500 ミリ秒  |  [バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  ProvisionedPollersConfig  |  いいえ  |  `MinimumPollers`: 指定しない場合のデフォルト値は 1 `MaximumPollers`: 指定しない場合のデフォルト値は 200 `PollerGroupName`: 該当なし  |  [プロビジョニングモード](kafka-scaling-modes.md#kafka-provisioned-mode)  | 
|  SelfManagedEventSource  |  はい  | 該当なし |  Kafka ブローカー一覧 作成時にのみ設定可能  | 
|  SelfManagedKafkaEventSourceConfig  |  いいえ  |  ConsumerGroupID フィールドを含み、デフォルトでは一意の値になっています。  |  作成時にのみ設定可能  | 
|  SourceAccessConfigurations  |  N  |  認証情報なし  |  クラスターの VPC 情報または認証情報   SASL\$1PLAIN は、BASIC\$1AUTH に設定  | 
|  StartingPosition  |  はい  |  該当なし  |  AT\$1TIMESTAMP、TRIM\$1HORIZON、または LATEST 作成時にのみ設定可能  | 
|  StartingPositionTimestamp  |  いいえ  |  該当なし  |  StartingPosition が AT\$1TIMESTAMP に設定されている場合にのみ必要  | 
|  タグ  |  いいえ  |  該当なし  |  [イベントソースマッピングでのタグの使用](tags-esm.md)  | 
|  トピック  |  はい  |  該当なし  |  トピック名 作成時にのみ設定可能  | 

**注記**  
`PollerGroupName` を指定すると、同じ Amazon VPC 内の複数の ESM イベントポーラーユニット (EPU) 容量を共有できます。このオプションを使用して、ESM プロビジョンドモードコストを最適化できます。ESM グループ化の要件:  
ESM は同じ Amazon VPC 内にある必要があります
ポーラーグループあたり最大 100 ESM
グループ内のすべての ESM の合計最大ポーラー数は 2,000 を超えることはできません
`PollerGroupName` を更新して ESM を別のグループに移動するか、`PollerGroupName` を空の文字列 ("") に設定して ESM をグループから削除できます。