

# Amazon MSK で Lambda を使用する
<a name="with-msk"></a>

[Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) は、Apache Kafka を使用してストリーミングデータを処理するアプリケーションの構築および実行に使用できるフルマネージド型サービスです。Amazon MSK は、Kafka クラスターのセットアップ、スケーリング、管理を簡素化します。また、Amazon MSK を使用すると、AWS Identity and Access Management (IAM) を使って複数のアベイラビリティーゾーンやセキュリティ向けにより簡単にアプリケーションを設定することができます。

この章では、Amazon MSK クラスターを Lambda 関数のイベントソースとして使用する方法について説明します。Amazon MSK と Lambda を統合する一般的なプロセスには、次の手順が含まれます。

1. **[クラスターとネットワークのセットアップ](with-msk-cluster-network.md)** – まず、[Amazon MSK クラスター](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html)をセットアップします。これには、Lambda がクラスターにアクセスするための正しいネットワーク設定が含まれます。

1. **[イベントソースマッピングのセットアップ](with-msk-configure.md)** – 次に、Amazon MSK クラスターを関数に安全に接続するために Lambda が必要とする[イベントソースマッピング](invocation-eventsourcemapping.md)リソースを作成します。

1. **[関数とアクセス許可のセットアップ](with-msk-permissions.md)** – 最後に、関数を正しくセットアップし、[実行ロール](lambda-intro-execution-role.md)に必要なアクセス許可を付与します。

**注記**  
Amazon MSK イベントソースマッピングを Lambda または Amazon MSK コンソールから直接作成および管理できるようになりました。どちらのコンソールにも、より合理化された設定プロセスに必要な Lambda 実行ロールのアクセス許可の設定を自動的に処理するオプションがあります。

Amazon MSK クラスターとの Lambda 統合を設定する方法の例については、「[チュートリアル: Amazon MSK イベントソースマッピングを使用して Lambda 関数を呼び出す](services-msk-tutorial.md)」、「AWS コンピューティングブログ」の「[Amazon MSK を AWS Lambda のイベントソースとして使用する](https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/)」、および「Amazon MSK Labs」の「[Amazon MSK Lambda 統合](https://amazonmsk-labs.workshop.aws/en/msklambda.html)」を参照してください。

**Topics**
+ [イベントの例](#msk-sample-event)
+ [Amazon MSK クラスターおよび Amazon VPC ネットワークの Lambda 向け設定](with-msk-cluster-network.md)
+ [Amazon MSK イベントソースマッピングの Lambda アクセス許可の設定](with-msk-permissions.md)
+ [Lambda での Amazon MSK イベントソースの設定](with-msk-configure.md)
+ [チュートリアル: Amazon MSK イベントソースマッピングを使用して Lambda 関数を呼び出す](services-msk-tutorial.md)

## イベントの例
<a name="msk-sample-event"></a>

Lambda は、関数を呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Amazon MSK トピックとパーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

# Amazon MSK クラスターおよび Amazon VPC ネットワークの Lambda 向け設定
<a name="with-msk-cluster-network"></a>

AWS Lambda 関数を Amazon MSK クラスターに接続するには、クラスターおよびクラスターが存在する [Amazon 仮想プライベートクラウド (VPC)](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html) を正しく設定する必要があります。このページでは、クラスターおよび VPC を設定する方法について説明します。クラスターおよび VPC が既に正しく設定されている場合、「[Lambda での Amazon MSK イベントソースの設定](with-msk-configure.md)」を参照してイベントソースマッピングを設定します。

**Topics**
+ [Lambda と MSK の統合におけるネットワーク設定要件の概要](#msk-network-requirements)
+ [MSK イベントソース向け NAT ゲートウェイの設定](#msk-nat-gateway)
+ [MSK イベントソースの AWS PrivateLink エンドポイントの設定](#msk-vpc-privatelink)

## Lambda と MSK の統合におけるネットワーク設定要件の概要
<a name="msk-network-requirements"></a>

Lambda と MSK の統合に必要なネットワーク設定は、アプリケーションのネットワークアーキテクチャによって異なります。この統合には、Amazon MSK クラスター、Lambda 関数、Lambda イベントソースマッピングの 3 つの主要リソースが関与しています。これらのリソースはそれぞれ異なる VPC に存在します。
+ Amazon MSK クラスターは通常、管理する VPC のプライベートサブネットに存在します。
+ Lambda 関数は、Lambda が所有する AWS マネージド VPC に存在します。
+ Lambda イベントソースマッピングは、関数を含む VPC とは別に、Lambda が所有する別の AWS マネージド VPC に存在します。

[イベントソースマッピング](invocation-eventsourcemapping.md)は、MSK クラスターと Lambda 関数間の中間リソースです。イベントソースマッピングは 2 つのプライマリジョブ機能を果たします。まず、MSK クラスターに新しいメッセージをポーリングします。次に、これらのメッセージを使用して Lambda 関数を呼び出します。これらの 3 つのリソースはそれぞれ異なる VPC に存在するため、ポーリングおよび呼び出しのオペレーションの両方にクロス VPC ネットワークコールが必要となります。

次の図で示すように、イベントソースマッピングのネットワーク設定要件は、[プロビジョニングモード](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)とオンデマンドモードのいずれを使用するかによって異なります。

![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/MSK-esm-network-overview.png)


Lambda イベントソースマッピングが MSK クラスターに新しいメッセージをポーリングする方法は、どちらのモードでも同じです。イベントソースマッピングと MSK クラスター間の接続を確立するため、Lambda はプライベートサブネットに[ハイパープレーン ENI](configuration-vpc.md#configuration-vpc-enis) (または、利用可能な場合は既存の ENI を再利用) を作成し、セキュリティで保護された接続を確立します。この図にあるように、ハイパープレーン ENI は Lambda 関数ではなく、MSK クラスターのサブネットおよびセキュリティグループ設定を使用します。

クラスターからのメッセージのポーリング後に Lambda が関数を呼び出す方法はモードごとに異なります。
+ プロビジョニングモードでは、Lambda はイベントソースマッピング VPC と関数 VPC 間の接続を自動的に処理します。したがって、関数を正常に呼び出すための追加のネットワークコンポーネントは不要です。
+ オンデマンドモードでは、Lambda イベントソースマッピングは、カスタマーマネージド型 VPC を経由するパスを介して関数を呼び出します。そのため、VPC のパブリックサブネットに [NAT ゲートウェイ](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)を設定するか、あるいは Lambda、[AWS Security Token Service (STS)](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html)、および (必要に応じて) [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) へのアクセスを提供する VPC のプライベートサブネットに [AWS PrivateLink](https://docs.aws.amazon.com/vpc/latest/privatelink/what-is-privatelink.html) エンドポイントを設定する必要があります。これらのオプションのどちらかを正しく設定することで、関数の呼び出しに必要な VPC と Lambda マネージドランタイム VPC 間の接続が可能になります。

NAT ゲートウェイは、プライベートサブネットのリソースがパブリックインターネットにアクセスできるようにします。この設定を使用した場合、トラフィックは Lambda 関数の呼び出し前にインターネットを経由します。一方、AWS PrivateLink エンドポイントはパブリックインターネットを経由しないので、プライベートサブネットから AWS サービスやその他のプライベート VPC リソースに安全に接続できます。これらのリソースの設定方法の詳細については、「[MSK イベントソース向け NAT ゲートウェイの設定](#msk-nat-gateway)」または「[MSK イベントソースの AWS PrivateLink エンドポイントの設定](#msk-vpc-privatelink)」を参照してください。

ここまでの説明は、MSK クラスターが VPC 内のプライベートサブネットに存在することを前提としており、これがより一般的なケースです。ただし、MSK クラスターが VPC 内のパブリックサブネットにあっても安全な接続を有効にするには、AWS PrivateLink エンドポイントを設定する必要があります。次の表は、MSK クラスターおよび Lambda イベントソースマッピングの設定方法に応じたネットワーク設定の要件をまとめたものです。


| MSK クラスターの場所 (カスタマーマネージド型 VPC 内) | Lambda イベントソースマッピングのスケーリングモード | 必要なネットワーク設定 | 
| --- | --- | --- | 
|  プライベートサブネット  |  オンデマンドモード  |  Lambda、AWS STS、および (必要に応じて) Secrets Manager へのアクセスを有効にする NAT ゲートウェイ (VPC のパブリックサブネット内) または AWS PrivateLink エンドポイント (VPC のプライベートサブネット内)。  | 
|  [Public subnet] (パブリックサブネット)  |  オンデマンドモード  |  Lambda、AWS STS、および (必要に応じて) Secrets Manager へのアクセスを有効にする AWS PrivateLink エンドポイント (VPC のパブリックサブネット内)。  | 
|  プライベートサブネット  |  プロビジョンドモード  |  なし  | 
|  [Public subnet] (パブリックサブネット)  |  プロビジョンドモード  |  なし  | 

さらに、MSK クラスターに関連付けられたセキュリティグループは、正しいポート経由でのトラフィックを許可する必要があります。次のセキュリティグループのルールが設定されていることを確認してください。
+ **インバウンドルール** – デフォルトのブローカーポートですべてのトラフィックを許可します。MSK が使用するポートは、クラスターの認証タイプによって異なります。IAM 認証の場合は `9098`、SASL/SCRAM の場合は `9096`、TLS の場合は `9094` を使用します。または、自己参照セキュリティグループルールを使用して、同じセキュリティグループ内のインスタンスからのアクセスを許可することもできます。
+ **アウトバウンドルール** – 関数が他の AWS サービスと通信する必要がある場合、外部の送信先の場合はポート `443` ですべてのトラフィックを許可します。逆に他の AWS サービスと通信する必要がない場合は、自己参照セキュリティグループのルールを使用してブローカーへのアクセスを制限することもできます。
+ **Amazon VPC エンドポイントのインバウンドルール** – Amazon VPC エンドポイントを使用している場合、そのエンドポイントに関連付けられたセキュリティグループは、クラスターのセキュリティグループからポート `443` でインバウンドトラフィックを許可する必要があります。

## MSK イベントソース向け NAT ゲートウェイの設定
<a name="msk-nat-gateway"></a>

イベントソースマッピングがクラスターからメッセージをポーリングできるように NAT ゲートウェイを設定し、VPC を経由するパスを介して関数を呼び出すことができます。この操作は、イベントソースマッピングがオンデマンドモードを使用しており、かつクラスターが VPC のプライベートサブネット内に存在する場合にのみ必要となります。クラスターが VPC のパブリックサブネットに存在するか、またはイベントソースマッピングがプロビジョニングモードを使用している場合、NAT ゲートウェイを設定する必要はありません。

[NAT ゲートウェイ](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)は、プライベートサブネットのリソースがパブリックインターネットにアクセスできるようにします。Lambda へのプライベート接続が必要な場合は、「[MSK イベントソースの AWS PrivateLink エンドポイントの設定](#msk-vpc-privatelink)」を参照してください。

NAT ゲートウェイを設定したら、適切なルートテーブルを設定する必要があります。これにより、プライベートサブネットからのトラフィックを NAT ゲートウェイ経由でパブリックインターネットにルーティングできるようになります。

![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/MSK-NAT-Gateway.png)


次の手順では、コンソールを使用して NAT ゲートウェイを設定する方法について説明します。必要に応じて、各アベイラビリティーゾーン (AZ) にこの手順を繰り返します。

**NAT ゲートウェイと適切なルーティングを設定する方法 (コンソール)**

1. 次の点に注意して、「[NAT ゲートウェイを作成する](https://docs.aws.amazon.com/vpc/latest/userguide/nat-gateway-working-with.html)」の手順を実行してください。
   + NAT ゲートウェイは常にパブリックサブネットに存在する必要があります。[パブリック接続](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)で NAT ゲートウェイを作成します。
   + MSK クラスターが複数の AZ 間でレプリケートされている場合、それぞれの AZ に 1 つの NAT ゲートウェイを作成します。つまり、各 AZ に、クラスターを含むプライベートサブネット 1 つと NAT ゲートウェイを含むパブリックサブネット 1 つが VPC に必要です。例えば 3 つの AZ でセットアップする場合、3 つのプライベートサブネット、3 つのパブリックサブネット、3 つの NAT ゲートウェイが必要になります。

1. NAT ゲートウェイを作成したら、[Amazon VPC コンソール](https://console.aws.amazon.com/vpc/)を開き、左側のメニューで **[ルートテーブル]** を選択します。

1. [**ルートテーブルの作成**] を選択します。

1. このルートテーブルは、MSK クラスターを含む VPC に関連付けます。必要に応じて、ルートテーブルの名前を入力します。

1. [**ルートテーブルの作成**] を選択します。

1. 先ほど作成したルートテーブルを選択します。

1. **[サブネットの関連付け]** タブで、**[サブネットの関連付けの編集]** を選択します。
   + ルートテーブルを、MSK クラスターを含むプライベートサブネットに関連付けます。

1. [**Export routes**] (ルートのエクスポート) を選択します。

1. **[ルートの追加]** を選択します。

   1. [**Destination**] で、[`0.0.0.0/0`] を選択します。

   1. **[ターゲット]** に **[NAT ゲートウェイ]** を選択します。

   1. 検索ボックスで、手順 1 で作成した NAT ゲートウェイを選択します。これは、MSK クラスターを含むプライベートサブネット (手順 6 でこのルートテーブルに関連付けられたプライベートサブネット) と同じ AZ 内の NAT ゲートウェイである必要があります。

1. **[Save changes]** (変更の保存) をクリックします。

## MSK イベントソースの AWS PrivateLink エンドポイントの設定
<a name="msk-vpc-privatelink"></a>

クラスターからメッセージをポーリングし、VPC を経由するパスを介して関数を呼び出す AWS PrivateLink エンドポイントを設定します。これらのエンドポイントにより、MSK クラスターが次の内容にアクセスできるようにします。
+ Lambda サービス
+ [AWS Security Token Service (STS)](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html)
+ 必要に応じて、[AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) サービス。これは、クラスター認証に必要なシークレットが Secrets Manager に保存されている場合に必要になります。

PrivateLink エンドポイントの設定は、イベントソースマッピングがオンデマンドモードを使用している場合にのみ必要です。イベントソースマッピングがプロビジョニングモードを使用している場合、Lambda がユーザーに代わって必要な接続を確立します。

PrivateLink エンドポイントを使用すると、AWS PrivateLink を経由して AWS サービスへの安全なプライベートアクセスが可能になります。MSK クラスターにパブリックインターネットへのアクセスを許可するように NAT ゲートウェイを設定するには、「[MSK イベントソース向け NAT ゲートウェイの設定](#msk-nat-gateway)」を参照してください。

VPC エンドポイントを設定すると、MSK クラスターに対して Lambda、STS、および (必要に応じて) Secrets Manager への直接かつプライベートのアクセス権が付与されます。

![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/MSK-PrivateLink-Endpoints.png)


次の手順では、コンソールを使用して PrivateLink エンドポイントを設定する方法について説明します。必要に応じて、各エンドポイント (Lambda、STS、Secrets Manager) にこの手順を繰り返します。

**VPC PrivateLink エンドポイントを設定する方法 (コンソール)**

1. [Amazon VPC コンソール](https://console.aws.amazon.com/vpc/)を開き、左側のメニューで **[エンドポイント]** を選択します。

1. **エンドポイントの作成** を選択します。

1. 必要に応じて、エンドポイントの名前を入力します。

1. **[タイプ]** には **[AWS サービス]** を選択します。

1. **[サービス]** で、サービスの名前を入力します。例えば、Lambda に接続するエンドポイントを作成するには、検索ボックスに「`lambda`」と入力します。

1. 結果には、現在のリージョンのサービスエンドポイントが表示されます。例えば、米国東部 (バージニア北部) リージョンでは、「`com.amazonaws.us-east-2.lambda`」が表示されます。このサービスを選択します。

1. **[ネットワーク設定]** で、MSK クラスターを含む VPC を選択します。

1. **[サブネット]** で、MSK クラスターが存在する AZ を選択します。
   + 各 AZ の **[サブネット ID]** で、MSK クラスターを含むプライベートサブネットを選択します。

1. **[セキュリティグループ]** で、MSK クラスターに関連付けられたセキュリティグループを選択します。

1. **エンドポイントの作成** を選択します。

デフォルトでは、Amazon VPC エンドポイントには、リソースへの広範なアクセスを許可するオープンな IAM ポリシーが適用されています。そのエンドポイントを使用して必要なアクションを実行するためのベストプラクティスは、これらのポリシーを制限することです。例えば、Secrets Manager エンドポイントの場合、関数の実行ロールのみがシークレットにアクセスできるようにポリシーを変更できます。

**Example VPC エンドポイントポリシー - Secrets Manager エンドポイント**  

```
{
    "Statement": [
        {
            "Action": "secretsmanager:GetSecretValue",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws::iam::123456789012:role/my-role"
                ]
            },
            "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
        }
    ]
}
```

AWS STS および Lambda エンドポイントの場合、呼び出し元プリンシパルを Lambda サービスプリンシパルのみに制限できます。ただし、これらのポリシーには必ず `"Resource": "*"` を使用してください。

**Example VPC エンドポイントポリシー - AWS STS エンドポイント**  

```
{
    "Statement": [
        {
            "Action": "sts:AssumeRole",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

**Example VPC エンドポイントポリシー – Lambda エンドポイント**  

```
{
    "Statement": [
        {
            "Action": "lambda:InvokeFunction",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

# Amazon MSK イベントソースマッピングの Lambda アクセス許可の設定
<a name="with-msk-permissions"></a>

Amazon MSK クラスターにアクセスするには、関数とイベントソースマッピングはさまざまな Amazon MSK API アクションを実行するアクセス許可を必要とします。これらのアクセス許可を、関数の[実行ロール](lambda-intro-execution-role.md)に追加します。アクセスを必要とするユーザーがいる場合、そのユーザーまたはロールのアイデンティティポリシーに必要なアクセス許可を追加します。

[AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) マネージドポリシーには、Amazon MSK Lambda イベントソースマッピングに必要な最小限のアクセス許可が含まれています。アクセス許可プロセスを簡素化するには、次の操作を行います。
+ [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) マネージドポリシーを実行ロールにアタッチします。
+ Lambda コンソールでアクセス許可を生成できるようにします。[コンソールで Amazon MSK イベントソースマッピングを作成する](msk-esm-create.md#msk-console)と、Lambda は実行ロールを評価し、アクセス許可がない場合に警告します。**[アクセス許可の生成]** を選択して、実行ロールを自動的に更新します。これは、実行ロールポリシーを手動で作成または変更した場合、またはポリシーが複数のロールにアタッチされている場合、機能しません。[障害発生時の送信先](kafka-on-failure.md)や [AWS Glue スキーマレジストリ](services-consume-kafka-events.md)などの高度な機能を使用する場合、実行ロールに追加のアクセス許可が必要になる場合があります。

**Topics**
+ [必要なアクセス許可](#msk-required-permissions)
+ [オプションのアクセス許可](#msk-optional-permissions)

## 必要なアクセス許可
<a name="msk-required-permissions"></a>

Lambda 関数の実行ロールには、Amazon MSK イベントソースマッピングに必要な以下のアクセス許可が必要です。これらのアクセス許可は、[AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) マネージドポリシーに含まれています。

### CloudWatch Logs のアクセス許可
<a name="msk-basic-permissions"></a>

次のアクセス許可により、Lambda は Amazon CloudWatch Logs にログを作成して保存できます。
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [logs:PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

### MSK クラスターのアクセス許可
<a name="msk-cluster-permissions"></a>

次のアクセス許可により、Lambda はユーザーに代わって Amazon MSK クラスターにアクセスできます。
+ [kafka:DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html)
+ [kafka:DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html)
+ [kafka:GetBootstrapBrokers](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-bootstrap-brokers.html)

[kafka:DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html) の代わりに [kafka:DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html) を使用することをお勧めします。v2 アクセス許可は、プロビジョニングされた Amazon MSK クラスターとサーバーレス Amazon MSK クラスターの両方で機能します。ポリシーでは、これらのアクセス許可のいずれかのみが必要です。

### VPC アクセス許可
<a name="msk-vpc-permissions"></a>

次のアクセス許可により、Lambda は Amazon MSK クラスターに接続するときにネットワークインターフェイスを作成および管理できます。
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [ ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)

## オプションのアクセス許可
<a name="msk-optional-permissions"></a>

 Lambda 関数には、以下を実行する許可も必要になる場合があります。
+ クロスアカウント Amazon MSK クラスターにアクセスする。クロスアカウントイベントソースマッピングの場合は、実行ロールに [kafka:DescribeVpcConnection](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connection-arn.html) が必要です。クロスアカウントイベントソースマッピングを作成する IAM プリンシパルには、[kafka:ListVpcConnections](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connections.html) が必要です。
+ [SASL/SCRAM 認証](msk-cluster-auth.md#msk-sasl-scram)を使用している場合で、SCRAM シークレットにアクセスする場合。これにより、関数はユーザー名とパスワードを使用して Kafka に接続できます。
+ SASL/SCRAM または [mTLS 認証](msk-cluster-auth.md#msk-mtls)を使用している場合で、Secrets Manager のシークレットを記述する場合。これにより、関数は安全な接続に必要な認証情報または証明書を取得できます。
+ AWS Secrets Manager シークレットが AWS KMS カスタマーマネージドキーで暗号化されている場合で、AWS KMS カスタマーマネージドキーにアクセスする場合。
+ 認証付きのスキーマレジストリを使用している場合で、スキーマレジストリシークレットにアクセスする場合。
  + AWS Glue Glue スキーマレジストリの場合、関数には `glue:GetRegistry` と `glue:GetSchemaVersion` のアクセス許可が必要です。これにより、関数は AWS Glue に保存されているメッセージ形式のルールを検索して使用できます。
  + `BASIC_AUTH` または `CLIENT_CERTIFICATE_TLS_AUTH` を使用した [Confluent スキーマレジストリ](https://docs.confluent.io/platform/current/schema-registry/security/index.html)の場合: 関数には、認証情報を含むシークレットに対する `secretsmanager:GetSecretValue` アクセス許可が必要です。これにより、関数は Confluent スキーマレジストリにアクセスするために必要なユーザー名/パスワードまたは証明書を取得できます。
  + プライベート CA 証明書の場合: 関数には、証明書を含むシークレットに対する secretsmanager:GetSecretValue アクセス許可が必要です。これにより、関数はカスタム証明書を使用するスキーマレジストリの ID を検証できます。
+ イベントソースマッピングに IAM 認証を使用している場合で、Kafka クラスターコンシューマーグループにアクセスし、トピックからメッセージをポーリングする場合。

 この対象となるのは以下のアクセス許可です。
+ [kafka:ListScramSecrets](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html) - Kafka 認証用の SCRAM シークレットのリストを許可します
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html) - Secrets Manager からのシークレットの取得を有効にします
+ [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) - AWS KMS を使用して暗号化されたデータの復号を許可します
+ [glue:GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html) - AWS Glue スキーマレジストリへのアクセスを許可します
+ [glue:GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html) - AWS Glue スキーマレジストリから特定のスキーマバージョンを取得できるようにします
+ [kafka-cluster:Connect](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) – クラスターに接続して認証するためのアクセス許可を付与
+ [kafka-cluster:AlterGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) – Apache Kafka の READ GROUP ACL に相当する、クラスター上のグループに参加させるためのアクセス許可を付与
+ [kafka-cluster:DescribeGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) – Apache Kafka の DESCRIBE GROUP ACL に相当する、クラスター上のグループを記述するためのアクセス許可を付与
+ [kafka-cluster:DescribeTopic](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) – Apache Kafka の DESCRIBE TOPIC ACL に相当する、クラスター上のトピックを記述するためのアクセス許可を付与
+ [kafka-cluster:ReadData](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) – Apache Kafka の READ TOPIC ACL に相当する、クラスター上のトピックからデータを読み取るためのアクセス許可を付与

 さらに、失敗した呼び出しのレコードを障害発生時の送信先に送信する場合、送信先タイプに応じて次のアクセス許可が必要です。
+ Amazon SQS 送信先の場合: [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html) - Amazon SQS キューへのメッセージの送信を許可します
+ Amazon SNS 送信先の場合: [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html) - Amazon SNS トピックへのメッセージの発行を許可します
+ Amazon S3 バケット送信先の場合: [s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) および [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBucket.html) - Amazon S3 バケット内のオブジェクトの書き込みと一覧表示を有効にします

認証および承認エラーをトラブルシューティングするには、「[セルフマネージド Kafka イベントソースマッピングエラーのトラブルシューティング](with-kafka-troubleshoot.md)」を参照してください。

# Lambda での Amazon MSK イベントソースの設定
<a name="with-msk-configure"></a>

Amazon MSK クラスターを Lambda 関数のイベントソースとして使用するには、2 つのリソースを接続する[イベントソースマッピング](invocation-eventsourcemapping.md)を作成します。このページでは、Amazon MSK 用にイベントソースマッピングを作成する方法について説明します。

このページの説明は、MSK クラスターおよびそのクラスターが存在する [Amazon 仮想プライベートクラウド (VPC)](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html) を正しく設定済みであることを前提としています。クラスターまたは VPC をセットアップする必要がある場合は、「[Amazon MSK クラスターおよび Amazon VPC ネットワークの Lambda 向け設定](with-msk-cluster-network.md)」を参照してください。エラー処理の再試行動作を設定するには、「[Kafka イベントソースのエラー処理コントロールの設定](kafka-retry-configurations.md)」を参照してください。

**Topics**
+ [イベントソースとしての Amazon MSK クラスターの使用](#msk-esm-overview)
+ [Lambda での Amazon MSK クラスターの認証方法の設定](msk-cluster-auth.md)
+ [Amazon MSK イベントソース向け Lambda イベントソースマッピング作成](msk-esm-create.md)
+ [Lambda でのクロスアカウントのイベントソースマッピングの作成](msk-cross-account.md)
+ [Lambda のすべての Amazon MSK イベントソースの設定パラメータ](msk-esm-parameters.md)

## イベントソースとしての Amazon MSK クラスターの使用
<a name="msk-esm-overview"></a>

Apache Kafka クラスターまたは Amazon MSK クラスターを Lambda 関数のトリガーとして追加すると、クラスターは[イベントソース](invocation-eventsourcemapping.md)として使用されます。

Lambda は、ユーザーが指定した[開始位置](kafka-starting-positions.md)に基づいて、[CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) リクエストで `Topics` として指定した Kafka トピックからイベントデータを読み取ります。処理が成功すると、Kafka トピックは Kafka クラスターにコミットされます。

Lambda は、Kafka トピックの各パーティションのメッセージを順番に読み込みます。1 つの Lambda ペイロードに、複数のパーティションからのメッセージを含めることができます。利用可能なレコードが増えると、Lambda は [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) リクエストで指定した BatchSize 値に基づいて、関数がトピックに追いつくまでバッチ単位でレコードの処理を継続します。

Lambda は各バッチを処理した後、そのバッチ内のメッセージのオフセットをコミットします。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。すべての再試行が失敗したレコードを、障害発生時の送信先に送信して、後で処理することができます。

**注記**  
Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。

# Lambda での Amazon MSK クラスターの認証方法の設定
<a name="msk-cluster-auth"></a>

Lambda は、Amazon MSK クラスターへのアクセス、レコードの取得、その他のタスクの実行を行うための許可を必要とします。Amazon MSK は、MSK クラスターに対する認証方法をいくつかサポートしています。

**Topics**
+ [非認証アクセス](#msk-unauthenticated)
+ [SASL/SCRAM 認証](#msk-sasl-scram)
+ [相互 TLS 認証](#msk-mtls)
+ [IAM 認証](#msk-iam-auth)
+ [Lambda でのブートストラップブローカーの選択方法](#msk-bootstrap-brokers)

## 非認証アクセス
<a name="msk-unauthenticated"></a>

インターネット経由でクラスターにアクセスするクライアントがない場合は、非認証アクセスを使用できます。

## SASL/SCRAM 認証
<a name="msk-sasl-scram"></a>

Lambda は、SHA-512 ハッシュ関数および Transport Layer Security (TLS) 暗号化を使用する [Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM)](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password-tutorial.html) 認証をサポートしています。Lambda がクラスターに接続できるようにするには、認証情報 (ユーザー名およびパスワード) を Secrets Manager のシークレットに保存し、イベントソースマッピングの設定時にこのシークレットを参照します。

Secrets Manager の使用に関する詳細については、「*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*」の「[Secrets Manager を使用したサインイン認証](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html)」を参照してください。

**注記**  
Amazon MSK は SASL/PLAIN 認証をサポートしません。

## 相互 TLS 認証
<a name="msk-mtls"></a>

相互 TLS (mTLS) は、クライアントとサーバー間に双方向認証を提供します。サーバーがクライアントを認証できるよう、クライアントはサーバーに証明書を送信します。また、クライアントがサーバーを認証できるよう、サーバーもクライアントに証明書を送信します。

Amazon MSK と Lambda の統合の場合、MSK クラスターはサーバーとして機能し、Lambda はクライアントとして機能します。
+ Lambda が MSK クラスターを認証するためには、Secrets Manager でクライアント証明書をシークレットとして設定し、イベントソースマッピング設定でこの証明書を参照します。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。
+ MSK クラスターは Lambda にもサーバー証明書を送信します。サーバー証明書は、AWS トラストストア内の認証局 (CA) によって署名される必要があります。

Amazon MSK は自己署名サーバー証明書をサポートしていません。Amazon MSK のすべてのブローカーは、デフォルトで Lambda が信頼する [Amazon Trust Services CA](https://www.amazontrust.com/repository/) によって署名された[パブリック証明書](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html)を使用します。

### mTLS シークレットの設定
<a name="mtls-auth-secret"></a>

CLIENT\$1CERTICATE\$1TLS\$1AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

**注記**  
Lambda は、[PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

```
-----BEGIN CERTIFICATE-----  
        <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) 形式にする必要があります。

```
-----BEGIN PRIVATE KEY-----  
         <private key contents>
-----END PRIVATE KEY-----
```

暗号化されたプライベートキーには、以下の構造を使用します。

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
          <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

```
{
 "privateKeyPassword": "testpassword",
 "certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
 "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

Amazon MSK 用の mTLS およびクライアント証明書の生成手順に関する詳細については、「*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*」の「[Amazon MSK の相互 TLS クライアント認証](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)」を参照してください。

## IAM 認証
<a name="msk-iam-auth"></a>

AWS Identity and Access Management (IAM) を使用して、MSK クラスターに接続するクライアントのアイデンティを認証することができます。IAM 認証を使用すると、Lambda は関数の[実行ロール](lambda-intro-execution-role.md)のアクセス許可に依存してクラスターへの接続、レコードの取得、その他の必要なアクションを実行します。必要なアクセス許可を含むサンプルポリシーについては、「*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*」の「[IAM ロールの認可ポリシーを作成する](https://docs.aws.amazon.com/msk/latest/developerguide/create-iam-access-control-policies.html)」を参照してください。

MSK クラスターで IAM 認証がアクティブで、シークレットを指定しない場合、Lambda はデフォルトで自動的に IAM 認証を使用します。

Amazon MSK での IAM 認証の詳細については、「[IAM アクセスコントロール](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)」を参照してください。

## Lambda でのブートストラップブローカーの選択方法
<a name="msk-bootstrap-brokers"></a>

Lambda は、クラスターで使用可能な認証方法、および認証用のシークレットが提供されているかどうかに基づき、[ブートストラップブローカー](https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html)を選択します。mTLS または SASL/SCRAM のシークレットを指定すると、Lambda は自動的にその認証方法を選択します。シークレットを指定しない場合、Lambda は、クラスターでアクティブ化されている中で、最も強力な認証方法を選択します。以下は、Lambda によるブローカー選択の優先度を、最も強力な認証から弱い認証の順に示したものです。
+ mTLS (mTLS 用のシークレットを提供)
+ SASL/SCRAM (SASL/SCRAM 用のシークレットを提供)
+ SASL IAM (シークレットが提供されておらず、IAM 認証がアクティブ)
+ 非認証の TLS (シークレットが提供されておらず、IAM 認証も非アクティブ)
+ プレーンテキスト (シークレットが提供されておらず、IAM 認証と非認証 TLS の両方が非アクティブ)

**注記**  
Lambda から最も安全なブローカータイプへの接続ができない場合でも、Lambda は別の (安全性の低い) ブローカータイプへの接続を試行しません。安全性の低いブローカータイプを Lambda に選択させたい場合は、クラスターが使用している、より強力な認証方法をすべて無効にします。

# Amazon MSK イベントソース向け Lambda イベントソースマッピング作成
<a name="msk-esm-create"></a>

イベントソースマッピングの作成には、Lambda コンソール、[AWS Command Line Interface (CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)、[AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/) のいずれかを使用できます。

**注記**  
イベントソースマッピングを作成すると、Lambda は MSK クラスターを含むプライベートサブネットに[ハイパープレーン ENI](configuration-vpc.md#configuration-vpc-enis) を作成し、これにより Lambda からの安全な接続を確立します。このハイパープレーン ENI は Lambda 関数ではなく、MSK クラスターのサブネットおよびセキュリティグループ設定を使用します。

次のコンソールの手順で、Lambda 関数のトリガーとして Amazon MSK クラスターを追加します。内部でイベントソースマッピングリソースが作成されます。

**Amazon MSK トリガーを Lambda 関数 (コンソール) に追加するには**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開きます。

1. Amazon MSK トリガーを追加する Lambda 関数の名前を選択します。

1. **[関数の概要]** で **[トリガーを追加]** をクリックします。

1. **[トリガー設定]** で **[MSK]** を選択します。

1. Kafka クラスターの詳細を指定するには、次の手順を実行してください。

   1. [**MSK cluster**] (MSK クラスター) で、クラスターを選択します。

   1. **[トピック名]**に、メッセージを使用する Kafka トピックの名前を入力します。

   1. **[コンシューマーグループ ID]** に、参加する Kafka コンシューマーグループの ID を入力します (該当する場合)。詳細については、「[Lambda でのカスタマイズ可能なコンシューマーグループ ID](kafka-consumer-group-id.md)」を参照してください。

1. **[クラスター認証]** で必要な設定を行います。クラスター認証の詳細については、「[Lambda での Amazon MSK クラスターの認証方法の設定](msk-cluster-auth.md)」を参照してください。
   + 接続の確立時に Lambda で MSK クラスターに対する認証を実行させたい場合、**[認証の使用]** をオンに切り替えます。認証の使用をお勧めします。
   + 認証を使用する場合、**[認証方法]** で使用する認証方法を選択します。
   + 認証を使用する場合、**[Secrets Manager キー]** で、クラスターへのアクセスに必要な認証情報を含む Secrets Manager キーを選択します。

1. **[イベントポーラー設定]** で、必要な設定を行います。
   + **[トリガーを有効にする]** を選択し、作成直後にトリガーが有効化されるようにします。
   + イベントソースマッピングに **[プロビジョニングモードの設定]** を行うかどうかを選択します。詳細については、「[Lambda での Apache Kafka イベントポーラーのスケーリングモード](kafka-scaling-modes.md)」を参照してください。
     + プロビジョンドモードを設定する場合は、**[最小イベントポーラー]** の値、**[最大イベントポーラー]** の値、PollerGroupName のオプションの値を入力して、同じイベントソース VPC 内の複数の ESM のグループ化を指定します。
   + **[開始位置]** で、Lambda がストリームからの読み取りを開始する方法を選択します。詳細については、「[Lambda での Apache Kafka ポーリングとストリームの開始位置](kafka-starting-positions.md)」を参照してください。

1. **[バッチ処理]** で必要な設定を行います。バッチ処理の詳細については、「[バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)」を参照してください。

   1. [**Batch size**] (バッチサイズ) で、単一バッチで取得されるメッセージの最大数を設定します。

   1. **[バッチウィンドウ]** に、Lambda が関数呼び出し前にレコード収集に費やすことのできる最大時間 (秒) を入力します。

1. **[フィルタリング]**で必要な設定を行います。のフィルタリングについての詳細は「[Amazon MSK およびセルフマネージド Apache Kafka イベントソースからのイベントのフィルタリング](kafka-filtering.md)」を参照してください。
   + **[フィルター条件]** にフィルター条件定義を追加して、イベントを処理するかどうかを決定します。

1. **[障害処理]** で必要な設定を行います。障害処理の詳細については、「[Amazon MSK とセルフマネージド Apache Kafka イベントソースの破棄されたバッチのキャプチャ](kafka-on-failure.md)」を参照してください。
   + **[障害発生時の宛先]** には、障害発生時の送信先の ARN を指定します。

1. **[タグ]** には、このイベントソースマッピングに関連付けるタグを入力します。

1. トリガーを追加するには、[**Add**] (追加) を選択します。

[create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) コマンドと一緒に AWS CLI を使用してイベントソースマッピングを作成することもできます。次の例では、`LATEST` メッセージを開始点として Lambda 関数の `my-msk-function` を `AWSKafkaTopic` トピックにマッピングするイベントソースマッピングを作成します。このコマンドは [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) オブジェクトも使用して、クラスターに接続するときに [SASL/SCRAM](msk-cluster-auth.md#msk-sasl-scram) 認証を使用するように Lambda に指示します。

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

クラスターが [mTLS 認証](msk-cluster-auth.md#msk-mtls)を使用する場合、`CLIENT_CERTIFICATE_TLS_AUTH` および Secrets Manager のキー ARN を指定する [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) オブジェクトを含める必要があります。これは次のコマンドで示されます。

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

クラスターが [IAM 認証](msk-cluster-auth.md#msk-iam-auth)を使用する場合、[SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) オブジェクトは不要です。これは次のコマンドで示されます。

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

# Lambda でのクロスアカウントのイベントソースマッピングの作成
<a name="msk-cross-account"></a>

[マルチ VPC プライベート接続](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html)を使用して、Lambda 関数を別の AWS アカウントのプロビジョニングされた MSK クラスターに接続できます。マルチ VPC 接続は AWS PrivateLink を使用して、すべてのトラフィックを AWS ネットワーク内に保持します。

**注記**  
サーバーレス MSK クラスターにはクロスアカウントイベントソースマッピングを作成できません。

クロスアカウントイベントソースマッピングを作成するには、まず [MSK クラスターのマルチ VPC 接続を設定する](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-turn-on)必要があります。イベントソースマッピングを作成するときは、以下の例に示すように、クラスター ARN の代わりにマネージド VPC 接続 ARN を使用します。[CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) オペレーションは、MSK クラスターが使用する認証タイプによっても異なります。

**Example — IAM 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。**  
クラスターが [IAM ロールベースの認証](msk-cluster-auth.md#msk-iam-auth)を使用する場合、[SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) オブジェクトは必要ありません。例:  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

**Example — SASL/SCRAM 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。**  
クラスターが [SASL/SCRAM 認証](msk-cluster-auth.md#msk-sasl-scram)を使用する場合は、`SASL_SCRAM_512_AUTH` および Secrets Manager のシークレット ARN を指定する [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) オブジェクトを含める必要があります。  
SASL/SCRAM 認証でクロスアカウントの Amazon MSK イベントソースマッピングにシークレットを使用する方法は 2 つあります。  
+ Lambda 関数アカウントにシークレットを作成し、クラスターシークレットと同期します。2 つのシークレットを同期させる[ローテーションを作成](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html)します。このオプションでは、関数アカウントからシークレットを制御できます。
+ MSK クラスターに関連付けられているシークレットを使用してください。このシークレットは、Lambda 関数アカウントへのクロスアカウントアクセスを許可する必要があります。詳細については、「[別のアカウントのユーザーの AWS Secrets Manager シークレットに対するアクセス許可](https://docs.aws.amazon.com/secretsmanager/latest/userguide/auth-and-access_examples_cross.html)」を参照してください。

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

**Example — mTLS 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。**  
クラスターが [mTLS 認証](msk-cluster-auth.md#msk-mtls)を使用する場合は、`CLIENT_CERTIFICATE_TLS_AUTH` および Secrets Manager のシークレット ARN を指定する [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) オブジェクトを含める必要があります。シークレットは、クラスターアカウントまたは Lambda 関数アカウントに保存できます。  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

# Lambda のすべての Amazon MSK イベントソースの設定パラメータ
<a name="msk-esm-parameters"></a>

すべての Lambda イベントソースタイプは、同じ [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) および [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API オペレーションを共有しています。ただし、次のテーブルで示されるように、Amazon MSK に適用されるのは一部のパラメータのみです。


| [Parameter] (パラメータ) | 必須 | デフォルト | 注意事項 | 
| --- | --- | --- | --- | 
|  AmazonManagedKafkaEventSourceConfig  |  N  |  ConsumerGroupID フィールドを含み、デフォルトでは一意の値になっています。  |  作成時にのみ設定可能  | 
|  BatchSize  |  N  |  100  |  最大: 10,000  | 
|  DestinationConfig  |  N  |  該当なし  |  [Amazon MSK とセルフマネージド Apache Kafka イベントソースの破棄されたバッチのキャプチャ](kafka-on-failure.md)  | 
|  有効  |  N  |  正  |    | 
|  BisectBatchOnFunctionError  |  N  |  誤  |  [Kafka イベントソースのエラー処理コントロールの設定](kafka-retry-configurations.md)  | 
|  FunctionResponseTypes  |  N  |  該当なし  |  [Kafka イベントソースのエラー処理コントロールの設定](kafka-retry-configurations.md)  | 
|  MaximumRecordAgeInSeconds  |  N  |  -1 (無制限)  |  [Kafka イベントソースのエラー処理コントロールの設定](kafka-retry-configurations.md)  | 
|  MaximumRetryAttempts  |  N  |  -1 (無制限)  |  [Kafka イベントソースのエラー処理コントロールの設定](kafka-retry-configurations.md)  | 
|  EventSourceArn  |  Y  | 該当なし |  作成時にのみ設定可能  | 
|  FilterCriteria  |  N  |  該当なし  |  [Lambda が関数に送信するイベントを制御する](invocation-eventfiltering.md)  | 
|  FunctionName  |  Y  |  該当なし  |    | 
|  KMSKeyArn  |  N  |  該当なし  |  [フィルター条件の暗号化](invocation-eventfiltering.md#filter-criteria-encryption)  | 
|  MaximumBatchingWindowInSeconds  |  N  |  500 ミリ秒  |  [バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  ProvisionedPollersConfig  |  N  |  `MinimumPollers`: 指定しない場合のデフォルト値は 1 `MaximumPollers`: 指定しない場合のデフォルト値は 200 `PollerGroupName`: 該当なし  |  [プロビジョニングモード](kafka-scaling-modes.md#kafka-provisioned-mode)  | 
|  SourceAccessConfigurations  |  N  |  認証情報なし  |  イベントソース用の、SASL/SCRAM あるいは CLIENT\$1CERTIFICATE\$1TLS\$1AUTH (MutualTLS) の認証情報  | 
|  StartingPosition  |  Y  | 該当なし |  AT\$1TIMESTAMP、TRIM\$1HORIZON、または LATEST 作成時にのみ設定可能  | 
|  StartingPositionTimestamp  |  N  |  該当なし  |  StartingPosition が AT\$1TIMESTAMP に設定されている場合にのみ必要  | 
|  タグ  |  N  |  該当なし  |  [イベントソースマッピングでのタグの使用](tags-esm.md)  | 
|  トピック  |  Y  | 該当なし |  カフカのトピック名 作成時にのみ設定可能  | 

**注記**  
`PollerGroupName` を指定すると、同じ Amazon VPC 内の複数の ESM イベントポーラーユニット (EPU) 容量を共有できます。このオプションを使用して、ESM プロビジョンドモードコストを最適化できます。ESM グループ化の要件:  
ESM は同じ Amazon VPC 内にある必要があります
ポーラーグループあたり最大 100 ESM
グループ内のすべての ESM の合計最大ポーラー数は 2,000 を超えることはできません
`PollerGroupName` を更新して ESM を別のグループに移動するか、`PollerGroupName` を空の文字列 ("") に設定して ESM をグループから削除できます。

# チュートリアル: Amazon MSK イベントソースマッピングを使用して Lambda 関数を呼び出す
<a name="services-msk-tutorial"></a>

本チュートリアルでは、次の手順を実行します。
+ 既存の Amazon MSK クラスターと同じ AWS アカウントに Lambda 関数を作成します。
+ Amazon MSK と通信するように Lambda のネットワークと認証を設定します。
+ Lambda Amazon MSK イベントソースマッピングを設定します。これにより、イベントがトピックに出現したときに Lambda 関数が実行されます。

これらのステップを完了したら、イベントが Amazon MSK に送信されたときに、独自のカスタム Lambda コードを使用してそれらのイベントを自動的に処理するための Lambda 関数を設定できるようになります。

 **この機能で何ができますか?**

**ソリューションの例: MSK イベントソースマッピングを使用して、ライブスコアを顧客に配信します。**

次のシナリオを考えてみましょう。あなたの会社は、顧客がスポーツの試合などのライブイベントに関する情報を表示できるウェブアプリケーションをホストしています。試合の情報更新は、Amazon MSK の Kafka トピックを通じてチームに提供されます。MSK トピックから取得した更新情報を使用して、開発中のアプリケーション内で顧客にライブイベントの更新ビューを提供するソリューションを設計する必要があります。次の設計アプローチを決定しました。クライアントアプリケーションは、AWS でホストされているサーバーレスバックエンドと通信します。クライアントは、Amazon API Gateway WebSocket API を使用して WebSocket セッション経由で接続します。

このソリューションでは、MSK イベントを読み取り、いくつかのカスタムロジックを実行してアプリケーションレイヤーのイベントを準備し、その情報を API Gateway API に転送するコンポーネントが必要です。このコンポーネントは、Lambda 関数でカスタムロジックを指定し、それを AWS Lambda Amazon MSK イベントソースマッピングで呼び出すことで、AWS Lambda を使用して実装できます。

Amazon API Gateway WebSocket API を使用したソリューションの実装の詳細については、API Gateway ドキュメントの「[WebSocket API のチュートリアル](https://docs.aws.amazon.com/apigateway/latest/developerguide/websocket-api-chat-app.html)」を参照してください。

## 前提条件
<a name="w2aad101c23c15c35c19"></a>

以下の事前設定されたリソースを持つ AWS アカウント。

**これらの前提条件を満たすには、Amazon MSK ドキュメントの「[Getting started using Amazon MSK](https://docs.aws.amazon.com//msk/latest/developerguide/getting-started.html)」を参照してください。**
+ Amazon MSK クラスター 「*Amazon MSK の使用を開始する*」の「[Amazon MSK クラスターを作成する](https://docs.aws.amazon.com//msk/latest/developerguide/create-cluster.html)」を参照してください。
+ 以下の設定を行います。
  + クラスターのセキュリティ設定で **[IAM ロールベースの認証]** が **[有効]** になっていることを確認します。これにより、必要な Amazon MSK リソースにのみアクセスするように Lambda 関数を制限することで、セキュリティが向上します。これは、新しい Amazon MSK クラスターではデフォルトで有効になっています。
  + クラスターネットワーク設定で **[パブリックアクセス]** がオフになっていることを確認します。Amazon MSK クラスターのインターネットへのアクセスを制限すると、データを処理する仲介者の数が制限されることでセキュリティが向上します。これは、新しい Amazon MSK クラスターではデフォルトで有効になっています。
+ このソリューションに使用する Amazon MSK クラスターの Kafka トピック。「*Amazon MSK の使用を開始する*」の「[トピックの作成](https://docs.aws.amazon.com//msk/latest/developerguide/create-topic.html)」を参照してください。
+ Kafka クラスターから情報を取得し、テスト用に Kafka イベントをトピックに送信するように設定された Kafka 管理ホスト。例えば、Kafka 管理 CLI と Amazon MSK IAM ライブラリがインストールされた Amazon EC2 インスタンスなどです。「*Amazon MSK の使用を開始する*」の「[クライアントマシンを作成する](https://docs.aws.amazon.com//msk/latest/developerguide/create-client-machine.html)」を参照してください。

これらのリソースを設定したら、AWS アカウントから次の情報を収集して、続行する準備ができていることを確認します。
+ Amazon MSK クラスターの名前。この情報は、Amazon MSK コンソールで確認できます。
+ クラスター UUID。Amazon MSK クラスターの ARN の一部であり、Amazon MSK コンソールで確認できます。この情報を確認するには、Amazon MSK ドキュメントの「[クラスターの一覧表示](https://docs.aws.amazon.com/msk/latest/developerguide/msk-list-clusters.html)」の手順に従います。
+ Amazon MSK クラスターに関連付けられているセキュリティグループ。この情報は、Amazon MSK コンソールで確認できます。次のステップでは、これらを *clusterSecurityGroups* と呼びます。
+ Amazon MSK クラスターを含む Amazon VPC の ID。この情報を見つけるには、Amazon MSK コンソールで Amazon MSK クラスターに関連付けられたサブネットを特定し、Amazon VPC コンソールでそのサブネットに関連付けられた Amazon VPC を特定します。
+ ソリューションで使用される Kafka トピックの名前。この情報を確認するには、Kafka 管理ホストから Kafka `topics` CLI を使用して Amazon MSK クラスターを呼び出します。トピック CLI の詳細については、Kafka ドキュメントの「[Adding and removing topics](https://kafka.apache.org/documentation/#basic_ops_add_topic)」を参照してください。
+ Kafka トピックのコンシューマーグループの名前。Lambda 関数での使用に適しています。このグループは Lambda によって自動的に作成できるため、Kafka CLI で作成する必要はありません。コンシューマーグループを管理する必要がある場合、コンシューマーグループ CLI の詳細については、Kafka ドキュメントの「[Managing Consumer Groups](https://kafka.apache.org/documentation/#basic_ops_consumer_group)」を参照してください。

AWS アカウントに次のアクセス許可が必要です。
+ Lambda 関数を作成および管理するためのアクセス許可。
+ IAM ポリシーを作成し、それを Lambda 関数に関連付けるためのアクセス許可。
+ Amazon MSK クラスターをホストする Amazon VPC で Amazon VPC エンドポイントを作成し、ネットワーク設定を変更するためのアクセス許可。

### AWS Command Line Interface のインストール
<a name="install_aws_cli"></a>

AWS Command Line Interface をまだインストールしていない場合は、「[最新バージョンの AWS CLI のインストールまたは更新](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)」にある手順に従ってインストールしてください。

このチュートリアルでは、コマンドを実行するためのコマンドラインターミナルまたはシェルが必要です。Linux および macOS では、任意のシェルとパッケージマネージャーを使用してください。

**注記**  
Windows では、Lambda でよく使用される一部の Bash CLI コマンド (`zip` など) が、オペレーティングシステムの組み込みターミナルでサポートされていません。Ubuntu および Bash の Windows 統合バージョンを取得するには、[Windows Subsystem for Linux をインストール](https://docs.microsoft.com/en-us/windows/wsl/install-win10)します。

## Amazon MSK と通信するように Lambda のネットワーク接続を設定する
<a name="w2aad101c23c15c35c21"></a>

 AWS PrivateLink を使用して Lambda と Amazon MSK を接続します。これを行うには、Amazon VPC コンソールでインターフェイス Amazon VPC エンドポイントを作成します。ネットワーク設定の詳細については、「[Amazon MSK クラスターおよび Amazon VPC ネットワークの Lambda 向け設定](with-msk-cluster-network.md)」を参照してください。

Amazon MSK イベントソースマッピングが Lambda 関数に代わって実行される際には、Lambda 関数の実行ロールを引き受けます。この IAM ロールは、マッピングに対して、IAM で保護されたリソース (Amazon MSK クラスターなど) へのアクセスを許可します。コンポーネントは実行ロールを共有しますが、次の図に示すように、Amazon MSK マッピングと Lambda 関数には、それぞれのタスクに対して個別の接続要件があります。

![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/msk_tut_network.png)


イベントソースマッピングは、Amazon MSK クラスターセキュリティグループに属します。このネットワークステップでは、Amazon MSK クラスター VPC から Amazon VPC エンドポイントを作成して、イベントソースマッピングを Lambda および STS サービスに接続します。Amazon MSK クラスターセキュリティグループからのトラフィックを受け入れるように、これらのエンドポイントを保護します。次に、Amazon MSK クラスターのセキュリティグループを調整して、イベントソースマッピングが Amazon MSK クラスターと通信できるようにします。

 AWS マネジメントコンソールを使用して、次の手順を設定できます。

**Lambda と Amazon MSK を接続するようにインターフェイス Amazon VPC エンドポイントを設定するには**

1. インターフェイス Amazon VPC エンドポイントのセキュリティグループ *endpointSecurityGroup* を作成します。これにより、*clusterSecurityGroups* からのポート 443 でのインバウンド TCP トラフィックが許可されます。Amazon EC2 ドキュメントの「[セキュリティグループの作成](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#creating-security-group)」の手順に従って、セキュリティグループを作成します。次に、Amazon EC2 ドキュメントの「[セキュリティグループへのルールの追加](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule)」の手順に従って、適切なルールを追加します。

   **次の情報を使用してセキュリティグループを作成します。**

   インバウンドルールを追加する際に、*clusterSecurityGroups* 内のセキュリティグループごとにルールを作成します。各ルールは、次のように作成します。
   + **[タイプ]** で **[HTTPS]** を選択します。
   + **[ソース]** で *clusterSecurityGroups* のいずれかを選択します。

1.  Lambda サービスを Amazon MSK クラスターを含む Amazon VPC に接続するエンドポイントを作成します。「[インターフェイスエンドポイントの作成](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html)」の手順に従います。

   **次の情報を使用してインターフェイスエンドポイントを作成します。**
   + **[サービス名]** で `com.amazonaws.regionName.lambda` を選択します。ここで、*regionName* が Lambda 関数をホストします。
   + **[VPC]** で、Amazon MSK クラスターを含む Amazon VPC を選択します。
   + **[セキュリティグループ]** で、前に作成した *endpointSecurityGroup* を選択します。
   + **[サブネット]** で、Amazon MSK クラスターをホストするサブネットを選択します。
   + **[ポリシー]** で、次のポリシードキュメントを指定します。これにより、Lambda サービスプリンシパルが `lambda:InvokeFunction` アクションに使用するためにエンドポイントを保護します。

     ```
     {
         "Statement": [
             {
                 "Action": "lambda:InvokeFunction",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + **[DNS 名を有効化]** が設定されたままであることを確認します。

1.  AWS STS サービスを Amazon MSK クラスターを含む Amazon VPC に接続するエンドポイントを作成します。「[インターフェイスエンドポイントの作成](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html)」の手順に従います。

   **次の情報を使用してインターフェイスエンドポイントを作成します。**
   + **[サービス名]** で AWS STS を選択します。
   + **[VPC]** で、Amazon MSK クラスターを含む Amazon VPC を選択します。
   + **[セキュリティグループ]** で *endpointSecurityGroup* を選択します。
   + **[サブネット]** で、Amazon MSK クラスターをホストするサブネットを選択します。
   + **[ポリシー]** で、次のポリシードキュメントを指定します。これにより、Lambda サービスプリンシパルが `sts:AssumeRole` アクションに使用するためにエンドポイントを保護します。

     ```
     {
         "Statement": [
             {
                 "Action": "sts:AssumeRole",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + **[DNS 名を有効化]** が設定されたままであることを確認します。

1. Amazon MSK クラスターに関連付けられている各セキュリティグループに対して、つまり *clusterSecurityGroups* で、次のことを許可します。
   + すべての *clusterSecurityGroups* に対して、ポート 9098 でのすべてのインバウンドおよびアウトバウンド TCP トラフィックを許可します。これには、clusterSecurityGroups 内でのトラフィックも含まれます。
   + ポート 443 でのすべてのアウトバウンド TCP トラフィックを許可します。

   このトラフィックの一部は、デフォルトのセキュリティグループルールで許可されているため、クラスターが単一のセキュリティグループにアタッチされており、そのグループにデフォルトのルールがある場合は、追加のルールは必要ありません。セキュリティグループルールを調整するには、Amazon EC2 ドキュメントの「[セキュリティグループへのルールの追加](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule)」の手順に従います。

   **次の情報を使用して、セキュリティグループにルールを追加します。**
   + ポート 9098 のインバウンドルールまたはアウトバウンドルールごとに、以下を指定します。
     + [**タイプ**] で、[**カスタム TCP**] を選択します。
     + **[ポート範囲]** で 9098 を指定します。
     + **[ソース]** で *clusterSecurityGroups* のいずれかを指定します。
   + ポート 443 のインバウンドルールごとに、**[タイプ]** で **[HTTPS]** を選択します。

## Lambda が Amazon MSK トピックから読み取るための IAM ロールを作成する
<a name="w2aad101c23c15c35c23"></a>

Lambda が Amazon MSK トピックから読み取るための認証要件を特定し、ポリシーで定義します。ロール *lambdaAuthRole* を作成します。このロールは、Lambda がこれらのアクセス許可を使用することを承認します。`kafka-cluster` IAM アクションを使用して Amazon MSK クラスターに対するアクションを承認します。次に、Lambda が Amazon MSK クラスターを検出して接続するために必要な Amazon MSK の `kafka` アクションと Amazon EC2 アクションを実行すること、および Lambda が実行した内容をログに記録できるように CloudWatch アクションを実行することを承認します。

**Lambda が Amazon MSK から読み取るための認証要件を記述するには**

1. IAM ポリシードキュメント (JSON ドキュメント) である *clusterAuthPolicy* を作成します。これにより、Lambda は Kafka コンシューマーグループを使用して、Amazon MSK クラスター内の Kafka トピックからデータを読み取ることができるようになります。Lambda では、読み取り時に Kafka コンシューマーグループを設定する必要があります。

   前提条件に合わせて次のテンプレートを変更します。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "kafka-cluster:Connect",
                   "kafka-cluster:DescribeGroup",
                   "kafka-cluster:AlterGroup",
                   "kafka-cluster:DescribeTopic",
                   "kafka-cluster:ReadData",
                   "kafka-cluster:DescribeClusterDynamicConfiguration"
               ],
               "Resource": [
                   "arn:aws:kafka:us-east-1:111122223333:cluster/mskClusterName/cluster-uuid",
                   "arn:aws:kafka:us-east-1:111122223333:topic/mskClusterName/cluster-uuid/mskTopicName",
                   "arn:aws:kafka:us-east-1:111122223333:group/mskClusterName/cluster-uuid/mskGroupName"
               ]
           }
       ]
   }
   ```

------

   詳細については、「[Amazon MSK イベントソースマッピングの Lambda アクセス許可の設定](with-msk-permissions.md)」を参照してください。ポリシーは、次のように作成します。
   + *us-east-1* と *111122223333* を Amazon MSK クラスターの AWS リージョン と AWS アカウント に置き換えます。
   + *mskClusterName* には、Amazon MSK クラスターの名前を指定します。
   + *cluster-uuid* には、Amazon MSK クラスターの ARN 内の UUID を指定します。
   + *mskTopicName* には、Kafka トピックの名前を指定します。
   + *mskGroupName* には、Kafka コンシューマーグループの名前を指定します。

1. Lambda が Amazon MSK クラスターを検出して接続し、それらのイベントをログに記録するために必要となる、Amazon MSK、Amazon EC2、および CloudWatch のアクセス許可を特定します。

   `AWSLambdaMSKExecutionRole` マネージドポリシーは、必要なアクセス許可を許容的に定義します。これは、次の手順で使用します。

   本番環境で `AWSLambdaMSKExecutionRole` を評価し、最小特権の原則に基づいて実行ロールポリシーを制限します。その後、ロールに対して、このマネージドポリシーを置き換えるポリシーを作成します。

IAM ポリシー言語の詳細については、[IAM ドキュメント](https://docs.aws.amazon.com//iam/)を参照してください。

ポリシードキュメントを作成したので、IAM ポリシーを作成してロールにアタッチすることができます。これを行うには、コンソールを使用して次の手順を実行します。

**ポリシードキュメントから IAM ポリシーを作成するには**

1. AWS マネジメントコンソールにサインインして、IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます

1. 左側のナビゲーションペインで、**[ポリシー]** を選択します。

1. [**Create policy**] (ポリシーの作成) を選択します。

1. [**ポリシーエディタ**] セクションで、[**JSON**] オプションを選択します。

1. *clusterAuthPolicy* を貼り付けます。

1. ポリシーにアクセス権限を追加し終えたら、[**次へ**] を選択します。

1. [**確認と作成**] ページで、作成するポリシーの [**ポリシー名**] と [**説明**] (オプション) を入力します。**このポリシーで定義されているアクセス許可** を確認して、ポリシーによって付与されたアクセス許可を確認します。

1. **ポリシーを作成** をクリックして、新しいポリシーを保存します。

詳細については、IAM ドキュメントの「[IAM ポリシーの作成](https://docs.aws.amazon.com//IAM/latest/UserGuide/access_policies_create.html)」を参照してください。

適切な IAM ポリシーを作成したので、ロールを作成してそれらのポリシーをアタッチします。これを行うには、コンソールを使用して次の手順を実行します。

**IAM コンソールで実行ロールを作成するには**

1. IAM コンソールの [[Roles (ロール)] ページ](https://console.aws.amazon.com/iam/home#/roles)を開きます。

1. [**ロールの作成**] を選択してください。

1. **[信頼されたエンティティタイプ]** から、**[AWS サービス]** を選択します。

1. [**ユースケース**] で、**Lambda** を選択します。

1. [**次へ**] を選択します。

1. 次のポリシーを指定します。
   + *clusterAuthPolicy*
   + `AWSLambdaMSKExecutionRole`

1. [**次へ**] を選択します。

1. **[ロール名]** に *lambdaAuthRole* と入力し、**[ロールの作成]** を選択します。

詳細については、「[実行ロールを使用した Lambda 関数のアクセス許可の定義](lambda-intro-execution-role.md)」を参照してください。

## Amazon MSK トピックから読み取る Lambda 関数を作成する
<a name="w2aad101c23c15c35c25"></a>

IAM ロールを使用するように設定された Lambda 関数を作成します。コンソールを使用して Lambda 関数を作成できます。

**認証設定を使用して Lambda 関数を作成するには**

1.  Lambda コンソールを開き、ヘッダーから **[関数の作成]** を選択します。

1. **[一から作成]** を選択します。

1. **[関数名]** で、任意の適切な名前を指定します。

1. **[ランタイム]** で、**[最新のサポート対象]** バージョンの `Node.js` を選択すると、このチュートリアルで提供されるコードを使用できます。

1. **[デフォルトの実行ロールの変更]** を選択します。

1. **[既存のロールを使用]** を選択します。

1. **[既存のロール]** で、*lambdaAuthRole* を選択します。

本番環境では、通常、Lambda 関数が Amazon MSK のイベントを適切に処理できるようにするために、実行ロールにさらにポリシーを追加する必要があります。ロールにポリシーを追加する方法の詳細については、IAM ドキュメントの「[ID アクセス許可の追加または削除](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html#add-policies-console)」を参照してください。

## Lambda 関数へのイベントソースマッピングを作成する
<a name="w2aad101c23c15c35c27"></a>

Amazon MSK イベントソースマッピングは、該当する Amazon MSK イベントが発生したときに Lambda を呼び出すために必要な情報を Lambda サービスに提供します。コンソールを使用して Amazon MSK マッピングを作成できます。Lambda トリガーを作成すると、イベントソースマッピングは自動的に設定されます。

**Lambda トリガー (およびイベントソースマッピング) を作成するには**

1. Lambda 関数の概要ページに移動します。

1. 関数の概要セクションで、左下の **[トリガーの追加]** を選択します。

1. **[ソースの選択]** ドロップダウンで、**[Amazon MSK]** を選択します。

1. **[認証]** を設定しないでください。

1. **[MSK クラスター]** で、クラスターの名前を選択します。

1. **[バッチサイズ]** で、1 を入力します。これは、この機能のテストを容易にするためのステップであり、本番環境で理想的な値ではありません。

1. **[トピック名]** で、Kafka トピックの名前を指定します。

1. **[コンシューマーグループ ID]** で、Kafka コンシューマーグループの ID を指定します。

## ストリーミングデータを読み取るために Lambda 関数を更新する
<a name="w2aad101c23c15c35c29"></a>

 Lambda は、イベントメソッドパラメータを使用して Kafka イベントに関する情報を提供します。Amazon MSK イベントの構造の例については、「[イベントの例](with-msk.md#msk-sample-event)」を参照してください。Lambda によって転送された Amazon MSK イベントを解釈する方法を理解したら、これらのイベントによって提供される情報を使用するように Lambda 関数コードを変更できます。

 テスト目的で Lambda Amazon MSK イベントの内容をログに記録するには、Lambda 関数に次のコードを指定します。

------
#### [ .NET ]

**SDK for .NET**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
.NET を使用した Lambda での Amazon MSK イベントの消費。  

```
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KafkaEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace MSKLambda;

public class Function
{
    
    
    /// <param name="input">The event for the Lambda function handler to process.</param>
    /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param>
    /// <returns></returns>
    public void FunctionHandler(KafkaEvent evnt, ILambdaContext context)
    {

        foreach (var record in evnt.Records)
        {
            Console.WriteLine("Key:" + record.Key); 
            foreach (var eventRecord in record.Value)
            {
                var valueBytes = eventRecord.Value.ToArray();    
                var valueText = Encoding.UTF8.GetString(valueBytes);
                
                Console.WriteLine("Message:" + valueText);
            }
        }
    }
    

}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Go を使用した Lambda での Amazon MSK イベントの消費。  

```
package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.KafkaEvent) {
	for key, records := range event.Records {
		fmt.Println("Key:", key)

		for _, record := range records {
			fmt.Println("Record:", record)

			decodedValue, _ := base64.StdEncoding.DecodeString(record.Value)
			message := string(decodedValue)
			fmt.Println("Message:", message)
		}
	}
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Java を使用した Lambda での Amazon MSK イベントの消費。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord;

import java.util.Base64;
import java.util.Map;

public class Example implements RequestHandler<KafkaEvent, Void> {

    @Override
    public Void handleRequest(KafkaEvent event, Context context) {
        for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) {
            String key = entry.getKey();
            System.out.println("Key: " + key);

            for (KafkaEventRecord record : entry.getValue()) {
                System.out.println("Record: " + record);

                byte[] value = Base64.getDecoder().decode(record.getValue());
                String message = new String(value);
                System.out.println("Message: " + message);
            }
        }

        return null;
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
JavaScript を使用した Lambda での Amazon MSK イベントの消費。  

```
exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}
```
TypeScript を使用した Lambda での Amazon MSK イベントの消費。  

```
import { MSKEvent, Context } from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "msk-handler-sample",
});

export const handler = async (
  event: MSKEvent,
  context: Context
): Promise<void> => {
  for (const [topic, topicRecords] of Object.entries(event.records)) {
    logger.info(`Processing key: ${topic}`);

    // Process each record in the partition
    for (const record of topicRecords) {
      try {
        // Decode the message value from base64
        const decodedMessage = Buffer.from(record.value, 'base64').toString();

        logger.info({
          message: decodedMessage
        });
      }
      catch (error) {
        logger.error('Error processing event', { error });
        throw error;
      }
    };
  }
}
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
PHP を使用した Lambda での Amazon MSK イベントの消費。  

```
<?php
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kafka\KafkaEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): void
    {
        $kafkaEvent = new KafkaEvent($event);
        $this->logger->info("Processing records");
        $records = $kafkaEvent->getRecords();

        foreach ($records as $record) {
            try {
                $key = $record->getKey();
                $this->logger->info("Key: $key");

                $values = $record->getValue();
                $this->logger->info(json_encode($values));

                foreach ($values as $value) {
                    $this->logger->info("Value: $value");
                }
                
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python (Boto3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Python を使用した Lambda での Amazon MSK イベントの消費。  

```
import base64

def lambda_handler(event, context):
    # Iterate through keys
    for key in event['records']:
        print('Key:', key)
        # Iterate through records
        for record in event['records'][key]:
            print('Record:', record)
            # Decode base64
            msg = base64.b64decode(record['value']).decode('utf-8')
            print('Message:', msg)
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Ruby を使用した Lambda での Amazon MSK イベントの消費。  

```
require 'base64'

def lambda_handler(event:, context:)
  # Iterate through keys
  event['records'].each do |key, records|
    puts "Key: #{key}"

    # Iterate through records
    records.each do |record|
      puts "Record: #{record}"

      # Decode base64
      msg = Base64.decode64(record['value'])
      puts "Message: #{msg}"
    end
  end
end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用した Lambda での Amazon MSK イベントの消費。  

```
use aws_lambda_events::event::kafka::KafkaEvent;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use base64::prelude::*;
use serde_json::{Value};
use tracing::{info};

/// Pre-Requisites:
/// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html
/// 2. Add packages tracing, tracing-subscriber, serde_json, base64
///
/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/

async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> {

    let payload = event.payload.records;

    for (_name, records) in payload.iter() {

        for record in records {

         let record_text = record.value.as_ref().ok_or("Value is None")?;
         info!("Record: {}", &record_text);

         // perform Base64 decoding
         let record_bytes = BASE64_STANDARD.decode(record_text)?;
         let message = std::str::from_utf8(&record_bytes)?;
         
         info!("Message: {}", message);
        }

    }

    Ok(().into())
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // required to enable CloudWatch error logging by the runtime
    tracing::init_default_subscriber();
    info!("Setup CW subscriber!");

    run(service_fn(function_handler)).await
}
```

------

コンソールを使用して Lambda に関数コードを指定できます。

**コンソールのコードエディタを使用して関数コードを更新するには**

1. Lambda コンソールの「[関数ページ](https://console.aws.amazon.com/lambda/home#/functions)」を開き、関数を選択します。

1. **[コード]** タブを選択します。

1. **[コードソース]** ペインでソースコードファイルを選択し、統合コードエディタで編集します。

1. **[DEPLOY]** セクションで、**[デプロイ]** を選択して関数のコードを更新します。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## Lambda 関数をテストして、Amazon MSK トピックに接続されていることを確認します。
<a name="w2aad101c23c15c35c31"></a>

CloudWatch イベントログを調べることで、Lambda がイベントソースによって呼び出されているかどうかを確認できるようになりました。

**Lambda 関数が呼び出されているかどうかを確認するには**

1. Kafka 管理ホストを使用し、`kafka-console-producer` CLI を使用して Kafka イベントを生成します。詳細については、Kafka ドキュメントの「[Write some events into the topic](https://kafka.apache.org/documentation/#quickstart_send)」を参照してください。前のステップで定義したイベントソースマッピングのバッチサイズで定義されたバッチを埋めるのに十分なイベントを送信してください。そうしないと、Lambda は追加の情報が呼び出されるまで待機します。

1. 関数が実行されると、Lambda はその結果を CloudWatch に書き込みます。コンソールで、Lambda 関数の詳細ページに移動します。

1. [**Configuration (設定)**] タブを選択します。

1. サイドバーから、**[モニタリングおよび運用ツール]** を選択します。

1. **[ロギング設定]** で **[CloudWatch ロググループ]** を特定します。ロググループは `/aws/lambda` で始まります。ロググループへのリンクを選択します。

1. CloudWatch コンソールの **[ログイベント]** で、Lambda がログストリームに送信したログイベントがないかを調べます。次の図のように、Kafka イベントからのメッセージを含むログイベントがあるかどうかを確認します。存在する場合は、Lambda イベントソースマッピングを使用して Lambda 関数を Amazon MSK に正常に接続できています。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/msk_tut_log.png)