

# チュートリアル: Amazon MSK イベントソースマッピングを使用して Lambda 関数を呼び出す
<a name="services-msk-tutorial"></a>

本チュートリアルでは、次の手順を実行します。
+ 既存の Amazon MSK クラスターと同じ AWS アカウントに Lambda 関数を作成します。
+ Amazon MSK と通信するように Lambda のネットワークと認証を設定します。
+ Lambda Amazon MSK イベントソースマッピングを設定します。これにより、イベントがトピックに出現したときに Lambda 関数が実行されます。

これらのステップを完了したら、イベントが Amazon MSK に送信されたときに、独自のカスタム Lambda コードを使用してそれらのイベントを自動的に処理するための Lambda 関数を設定できるようになります。

 **この機能で何ができますか?**

**ソリューションの例: MSK イベントソースマッピングを使用して、ライブスコアを顧客に配信します。**

次のシナリオを考えてみましょう。あなたの会社は、顧客がスポーツの試合などのライブイベントに関する情報を表示できるウェブアプリケーションをホストしています。試合の情報更新は、Amazon MSK の Kafka トピックを通じてチームに提供されます。MSK トピックから取得した更新情報を使用して、開発中のアプリケーション内で顧客にライブイベントの更新ビューを提供するソリューションを設計する必要があります。次の設計アプローチを決定しました。クライアントアプリケーションは、AWS でホストされているサーバーレスバックエンドと通信します。クライアントは、Amazon API Gateway WebSocket API を使用して WebSocket セッション経由で接続します。

このソリューションでは、MSK イベントを読み取り、いくつかのカスタムロジックを実行してアプリケーションレイヤーのイベントを準備し、その情報を API Gateway API に転送するコンポーネントが必要です。このコンポーネントは、Lambda 関数でカスタムロジックを指定し、それを AWS Lambda Amazon MSK イベントソースマッピングで呼び出すことで、AWS Lambda を使用して実装できます。

Amazon API Gateway WebSocket API を使用したソリューションの実装の詳細については、API Gateway ドキュメントの「[WebSocket API のチュートリアル](https://docs.aws.amazon.com/apigateway/latest/developerguide/websocket-api-chat-app.html)」を参照してください。

## 前提条件
<a name="w2aad101c23c15c35c19"></a>

以下の事前設定されたリソースを持つ AWS アカウント。

**これらの前提条件を満たすには、Amazon MSK ドキュメントの「[Getting started using Amazon MSK](https://docs.aws.amazon.com//msk/latest/developerguide/getting-started.html)」を参照してください。**
+ Amazon MSK クラスター 「*Amazon MSK の使用を開始する*」の「[Amazon MSK クラスターを作成する](https://docs.aws.amazon.com//msk/latest/developerguide/create-cluster.html)」を参照してください。
+ 以下の設定を行います。
  + クラスターのセキュリティ設定で **[IAM ロールベースの認証]** が **[有効]** になっていることを確認します。これにより、必要な Amazon MSK リソースにのみアクセスするように Lambda 関数を制限することで、セキュリティが向上します。これは、新しい Amazon MSK クラスターではデフォルトで有効になっています。
  + クラスターネットワーク設定で **[パブリックアクセス]** がオフになっていることを確認します。Amazon MSK クラスターのインターネットへのアクセスを制限すると、データを処理する仲介者の数が制限されることでセキュリティが向上します。これは、新しい Amazon MSK クラスターではデフォルトで有効になっています。
+ このソリューションに使用する Amazon MSK クラスターの Kafka トピック。「*Amazon MSK の使用を開始する*」の「[トピックの作成](https://docs.aws.amazon.com//msk/latest/developerguide/create-topic.html)」を参照してください。
+ Kafka クラスターから情報を取得し、テスト用に Kafka イベントをトピックに送信するように設定された Kafka 管理ホスト。例えば、Kafka 管理 CLI と Amazon MSK IAM ライブラリがインストールされた Amazon EC2 インスタンスなどです。「*Amazon MSK の使用を開始する*」の「[クライアントマシンを作成する](https://docs.aws.amazon.com//msk/latest/developerguide/create-client-machine.html)」を参照してください。

これらのリソースを設定したら、AWS アカウントから次の情報を収集して、続行する準備ができていることを確認します。
+ Amazon MSK クラスターの名前。この情報は、Amazon MSK コンソールで確認できます。
+ クラスター UUID。Amazon MSK クラスターの ARN の一部であり、Amazon MSK コンソールで確認できます。この情報を確認するには、Amazon MSK ドキュメントの「[クラスターの一覧表示](https://docs.aws.amazon.com/msk/latest/developerguide/msk-list-clusters.html)」の手順に従います。
+ Amazon MSK クラスターに関連付けられているセキュリティグループ。この情報は、Amazon MSK コンソールで確認できます。次のステップでは、これらを *clusterSecurityGroups* と呼びます。
+ Amazon MSK クラスターを含む Amazon VPC の ID。この情報を見つけるには、Amazon MSK コンソールで Amazon MSK クラスターに関連付けられたサブネットを特定し、Amazon VPC コンソールでそのサブネットに関連付けられた Amazon VPC を特定します。
+ ソリューションで使用される Kafka トピックの名前。この情報を確認するには、Kafka 管理ホストから Kafka `topics` CLI を使用して Amazon MSK クラスターを呼び出します。トピック CLI の詳細については、Kafka ドキュメントの「[Adding and removing topics](https://kafka.apache.org/documentation/#basic_ops_add_topic)」を参照してください。
+ Kafka トピックのコンシューマーグループの名前。Lambda 関数での使用に適しています。このグループは Lambda によって自動的に作成できるため、Kafka CLI で作成する必要はありません。コンシューマーグループを管理する必要がある場合、コンシューマーグループ CLI の詳細については、Kafka ドキュメントの「[Managing Consumer Groups](https://kafka.apache.org/documentation/#basic_ops_consumer_group)」を参照してください。

AWS アカウントに次のアクセス許可が必要です。
+ Lambda 関数を作成および管理するためのアクセス許可。
+ IAM ポリシーを作成し、それを Lambda 関数に関連付けるためのアクセス許可。
+ Amazon MSK クラスターをホストする Amazon VPC で Amazon VPC エンドポイントを作成し、ネットワーク設定を変更するためのアクセス許可。

### AWS Command Line Interface のインストール
<a name="install_aws_cli"></a>

AWS Command Line Interface をまだインストールしていない場合は、「[最新バージョンの AWS CLI のインストールまたは更新](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)」にある手順に従ってインストールしてください。

このチュートリアルでは、コマンドを実行するためのコマンドラインターミナルまたはシェルが必要です。Linux および macOS では、任意のシェルとパッケージマネージャーを使用してください。

**注記**  
Windows では、Lambda でよく使用される一部の Bash CLI コマンド (`zip` など) が、オペレーティングシステムの組み込みターミナルでサポートされていません。Ubuntu および Bash の Windows 統合バージョンを取得するには、[Windows Subsystem for Linux をインストール](https://docs.microsoft.com/en-us/windows/wsl/install-win10)します。

## Amazon MSK と通信するように Lambda のネットワーク接続を設定する
<a name="w2aad101c23c15c35c21"></a>

 AWS PrivateLink を使用して Lambda と Amazon MSK を接続します。これを行うには、Amazon VPC コンソールでインターフェイス Amazon VPC エンドポイントを作成します。ネットワーク設定の詳細については、「[Amazon MSK クラスターおよび Amazon VPC ネットワークの Lambda 向け設定](with-msk-cluster-network.md)」を参照してください。

Amazon MSK イベントソースマッピングが Lambda 関数に代わって実行される際には、Lambda 関数の実行ロールを引き受けます。この IAM ロールは、マッピングに対して、IAM で保護されたリソース (Amazon MSK クラスターなど) へのアクセスを許可します。コンポーネントは実行ロールを共有しますが、次の図に示すように、Amazon MSK マッピングと Lambda 関数には、それぞれのタスクに対して個別の接続要件があります。

![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/msk_tut_network.png)


イベントソースマッピングは、Amazon MSK クラスターセキュリティグループに属します。このネットワークステップでは、Amazon MSK クラスター VPC から Amazon VPC エンドポイントを作成して、イベントソースマッピングを Lambda および STS サービスに接続します。Amazon MSK クラスターセキュリティグループからのトラフィックを受け入れるように、これらのエンドポイントを保護します。次に、Amazon MSK クラスターのセキュリティグループを調整して、イベントソースマッピングが Amazon MSK クラスターと通信できるようにします。

 AWS マネジメントコンソールを使用して、次の手順を設定できます。

**Lambda と Amazon MSK を接続するようにインターフェイス Amazon VPC エンドポイントを設定するには**

1. インターフェイス Amazon VPC エンドポイントのセキュリティグループ *endpointSecurityGroup* を作成します。これにより、*clusterSecurityGroups* からのポート 443 でのインバウンド TCP トラフィックが許可されます。Amazon EC2 ドキュメントの「[セキュリティグループの作成](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#creating-security-group)」の手順に従って、セキュリティグループを作成します。次に、Amazon EC2 ドキュメントの「[セキュリティグループへのルールの追加](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule)」の手順に従って、適切なルールを追加します。

   **次の情報を使用してセキュリティグループを作成します。**

   インバウンドルールを追加する際に、*clusterSecurityGroups* 内のセキュリティグループごとにルールを作成します。各ルールは、次のように作成します。
   + **[タイプ]** で **[HTTPS]** を選択します。
   + **[ソース]** で *clusterSecurityGroups* のいずれかを選択します。

1.  Lambda サービスを Amazon MSK クラスターを含む Amazon VPC に接続するエンドポイントを作成します。「[インターフェイスエンドポイントの作成](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html)」の手順に従います。

   **次の情報を使用してインターフェイスエンドポイントを作成します。**
   + **[サービス名]** で `com.amazonaws.regionName.lambda` を選択します。ここで、*regionName* が Lambda 関数をホストします。
   + **[VPC]** で、Amazon MSK クラスターを含む Amazon VPC を選択します。
   + **[セキュリティグループ]** で、前に作成した *endpointSecurityGroup* を選択します。
   + **[サブネット]** で、Amazon MSK クラスターをホストするサブネットを選択します。
   + **[ポリシー]** で、次のポリシードキュメントを指定します。これにより、Lambda サービスプリンシパルが `lambda:InvokeFunction` アクションに使用するためにエンドポイントを保護します。

     ```
     {
         "Statement": [
             {
                 "Action": "lambda:InvokeFunction",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + **[DNS 名を有効化]** が設定されたままであることを確認します。

1.  AWS STS サービスを Amazon MSK クラスターを含む Amazon VPC に接続するエンドポイントを作成します。「[インターフェイスエンドポイントの作成](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html)」の手順に従います。

   **次の情報を使用してインターフェイスエンドポイントを作成します。**
   + **[サービス名]** で AWS STS を選択します。
   + **[VPC]** で、Amazon MSK クラスターを含む Amazon VPC を選択します。
   + **[セキュリティグループ]** で *endpointSecurityGroup* を選択します。
   + **[サブネット]** で、Amazon MSK クラスターをホストするサブネットを選択します。
   + **[ポリシー]** で、次のポリシードキュメントを指定します。これにより、Lambda サービスプリンシパルが `sts:AssumeRole` アクションに使用するためにエンドポイントを保護します。

     ```
     {
         "Statement": [
             {
                 "Action": "sts:AssumeRole",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + **[DNS 名を有効化]** が設定されたままであることを確認します。

1. Amazon MSK クラスターに関連付けられている各セキュリティグループに対して、つまり *clusterSecurityGroups* で、次のことを許可します。
   + すべての *clusterSecurityGroups* に対して、ポート 9098 でのすべてのインバウンドおよびアウトバウンド TCP トラフィックを許可します。これには、clusterSecurityGroups 内でのトラフィックも含まれます。
   + ポート 443 でのすべてのアウトバウンド TCP トラフィックを許可します。

   このトラフィックの一部は、デフォルトのセキュリティグループルールで許可されているため、クラスターが単一のセキュリティグループにアタッチされており、そのグループにデフォルトのルールがある場合は、追加のルールは必要ありません。セキュリティグループルールを調整するには、Amazon EC2 ドキュメントの「[セキュリティグループへのルールの追加](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule)」の手順に従います。

   **次の情報を使用して、セキュリティグループにルールを追加します。**
   + ポート 9098 のインバウンドルールまたはアウトバウンドルールごとに、以下を指定します。
     + [**タイプ**] で、[**カスタム TCP**] を選択します。
     + **[ポート範囲]** で 9098 を指定します。
     + **[ソース]** で *clusterSecurityGroups* のいずれかを指定します。
   + ポート 443 のインバウンドルールごとに、**[タイプ]** で **[HTTPS]** を選択します。

## Lambda が Amazon MSK トピックから読み取るための IAM ロールを作成する
<a name="w2aad101c23c15c35c23"></a>

Lambda が Amazon MSK トピックから読み取るための認証要件を特定し、ポリシーで定義します。ロール *lambdaAuthRole* を作成します。このロールは、Lambda がこれらのアクセス許可を使用することを承認します。`kafka-cluster` IAM アクションを使用して Amazon MSK クラスターに対するアクションを承認します。次に、Lambda が Amazon MSK クラスターを検出して接続するために必要な Amazon MSK の `kafka` アクションと Amazon EC2 アクションを実行すること、および Lambda が実行した内容をログに記録できるように CloudWatch アクションを実行することを承認します。

**Lambda が Amazon MSK から読み取るための認証要件を記述するには**

1. IAM ポリシードキュメント (JSON ドキュメント) である *clusterAuthPolicy* を作成します。これにより、Lambda は Kafka コンシューマーグループを使用して、Amazon MSK クラスター内の Kafka トピックからデータを読み取ることができるようになります。Lambda では、読み取り時に Kafka コンシューマーグループを設定する必要があります。

   前提条件に合わせて次のテンプレートを変更します。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "kafka-cluster:Connect",
                   "kafka-cluster:DescribeGroup",
                   "kafka-cluster:AlterGroup",
                   "kafka-cluster:DescribeTopic",
                   "kafka-cluster:ReadData",
                   "kafka-cluster:DescribeClusterDynamicConfiguration"
               ],
               "Resource": [
                   "arn:aws:kafka:us-east-1:111122223333:cluster/mskClusterName/cluster-uuid",
                   "arn:aws:kafka:us-east-1:111122223333:topic/mskClusterName/cluster-uuid/mskTopicName",
                   "arn:aws:kafka:us-east-1:111122223333:group/mskClusterName/cluster-uuid/mskGroupName"
               ]
           }
       ]
   }
   ```

------

   詳細については、「[Amazon MSK イベントソースマッピングの Lambda アクセス許可の設定](with-msk-permissions.md)」を参照してください。ポリシーは、次のように作成します。
   + *us-east-1* と *111122223333* を Amazon MSK クラスターの AWS リージョン と AWS アカウント に置き換えます。
   + *mskClusterName* には、Amazon MSK クラスターの名前を指定します。
   + *cluster-uuid* には、Amazon MSK クラスターの ARN 内の UUID を指定します。
   + *mskTopicName* には、Kafka トピックの名前を指定します。
   + *mskGroupName* には、Kafka コンシューマーグループの名前を指定します。

1. Lambda が Amazon MSK クラスターを検出して接続し、それらのイベントをログに記録するために必要となる、Amazon MSK、Amazon EC2、および CloudWatch のアクセス許可を特定します。

   `AWSLambdaMSKExecutionRole` マネージドポリシーは、必要なアクセス許可を許容的に定義します。これは、次の手順で使用します。

   本番環境で `AWSLambdaMSKExecutionRole` を評価し、最小特権の原則に基づいて実行ロールポリシーを制限します。その後、ロールに対して、このマネージドポリシーを置き換えるポリシーを作成します。

IAM ポリシー言語の詳細については、[IAM ドキュメント](https://docs.aws.amazon.com//iam/)を参照してください。

ポリシードキュメントを作成したので、IAM ポリシーを作成してロールにアタッチすることができます。これを行うには、コンソールを使用して次の手順を実行します。

**ポリシードキュメントから IAM ポリシーを作成するには**

1. AWS マネジメントコンソールにサインインして、IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます

1. 左側のナビゲーションペインで、**[ポリシー]** を選択します。

1. [**Create policy**] (ポリシーの作成) を選択します。

1. [**ポリシーエディタ**] セクションで、[**JSON**] オプションを選択します。

1. *clusterAuthPolicy* を貼り付けます。

1. ポリシーにアクセス権限を追加し終えたら、[**次へ**] を選択します。

1. [**確認と作成**] ページで、作成するポリシーの [**ポリシー名**] と [**説明**] (オプション) を入力します。**このポリシーで定義されているアクセス許可** を確認して、ポリシーによって付与されたアクセス許可を確認します。

1. **ポリシーを作成** をクリックして、新しいポリシーを保存します。

詳細については、IAM ドキュメントの「[IAM ポリシーの作成](https://docs.aws.amazon.com//IAM/latest/UserGuide/access_policies_create.html)」を参照してください。

適切な IAM ポリシーを作成したので、ロールを作成してそれらのポリシーをアタッチします。これを行うには、コンソールを使用して次の手順を実行します。

**IAM コンソールで実行ロールを作成するには**

1. IAM コンソールの [[Roles (ロール)] ページ](https://console.aws.amazon.com/iam/home#/roles)を開きます。

1. [**ロールの作成**] を選択してください。

1. **[信頼されたエンティティタイプ]** から、**[AWS サービス]** を選択します。

1. [**ユースケース**] で、**Lambda** を選択します。

1. [**次へ**] を選択します。

1. 次のポリシーを指定します。
   + *clusterAuthPolicy*
   + `AWSLambdaMSKExecutionRole`

1. [**次へ**] を選択します。

1. **[ロール名]** に *lambdaAuthRole* と入力し、**[ロールの作成]** を選択します。

詳細については、「[実行ロールを使用した Lambda 関数のアクセス許可の定義](lambda-intro-execution-role.md)」を参照してください。

## Amazon MSK トピックから読み取る Lambda 関数を作成する
<a name="w2aad101c23c15c35c25"></a>

IAM ロールを使用するように設定された Lambda 関数を作成します。コンソールを使用して Lambda 関数を作成できます。

**認証設定を使用して Lambda 関数を作成するには**

1.  Lambda コンソールを開き、ヘッダーから **[関数の作成]** を選択します。

1. **[一から作成]** を選択します。

1. **[関数名]** で、任意の適切な名前を指定します。

1. **[ランタイム]** で、**[最新のサポート対象]** バージョンの `Node.js` を選択すると、このチュートリアルで提供されるコードを使用できます。

1. **[デフォルトの実行ロールの変更]** を選択します。

1. **[既存のロールを使用]** を選択します。

1. **[既存のロール]** で、*lambdaAuthRole* を選択します。

本番環境では、通常、Lambda 関数が Amazon MSK のイベントを適切に処理できるようにするために、実行ロールにさらにポリシーを追加する必要があります。ロールにポリシーを追加する方法の詳細については、IAM ドキュメントの「[ID アクセス許可の追加または削除](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html#add-policies-console)」を参照してください。

## Lambda 関数へのイベントソースマッピングを作成する
<a name="w2aad101c23c15c35c27"></a>

Amazon MSK イベントソースマッピングは、該当する Amazon MSK イベントが発生したときに Lambda を呼び出すために必要な情報を Lambda サービスに提供します。コンソールを使用して Amazon MSK マッピングを作成できます。Lambda トリガーを作成すると、イベントソースマッピングは自動的に設定されます。

**Lambda トリガー (およびイベントソースマッピング) を作成するには**

1. Lambda 関数の概要ページに移動します。

1. 関数の概要セクションで、左下の **[トリガーの追加]** を選択します。

1. **[ソースの選択]** ドロップダウンで、**[Amazon MSK]** を選択します。

1. **[認証]** を設定しないでください。

1. **[MSK クラスター]** で、クラスターの名前を選択します。

1. **[バッチサイズ]** で、1 を入力します。これは、この機能のテストを容易にするためのステップであり、本番環境で理想的な値ではありません。

1. **[トピック名]** で、Kafka トピックの名前を指定します。

1. **[コンシューマーグループ ID]** で、Kafka コンシューマーグループの ID を指定します。

## ストリーミングデータを読み取るために Lambda 関数を更新する
<a name="w2aad101c23c15c35c29"></a>

 Lambda は、イベントメソッドパラメータを使用して Kafka イベントに関する情報を提供します。Amazon MSK イベントの構造の例については、「[イベントの例](with-msk.md#msk-sample-event)」を参照してください。Lambda によって転送された Amazon MSK イベントを解釈する方法を理解したら、これらのイベントによって提供される情報を使用するように Lambda 関数コードを変更できます。

 テスト目的で Lambda Amazon MSK イベントの内容をログに記録するには、Lambda 関数に次のコードを指定します。

------
#### [ .NET ]

**SDK for .NET**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
.NET を使用した Lambda での Amazon MSK イベントの消費。  

```
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KafkaEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace MSKLambda;

public class Function
{
    
    
    /// <param name="input">The event for the Lambda function handler to process.</param>
    /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param>
    /// <returns></returns>
    public void FunctionHandler(KafkaEvent evnt, ILambdaContext context)
    {

        foreach (var record in evnt.Records)
        {
            Console.WriteLine("Key:" + record.Key); 
            foreach (var eventRecord in record.Value)
            {
                var valueBytes = eventRecord.Value.ToArray();    
                var valueText = Encoding.UTF8.GetString(valueBytes);
                
                Console.WriteLine("Message:" + valueText);
            }
        }
    }
    

}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Go を使用した Lambda での Amazon MSK イベントの消費。  

```
package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.KafkaEvent) {
	for key, records := range event.Records {
		fmt.Println("Key:", key)

		for _, record := range records {
			fmt.Println("Record:", record)

			decodedValue, _ := base64.StdEncoding.DecodeString(record.Value)
			message := string(decodedValue)
			fmt.Println("Message:", message)
		}
	}
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Java を使用した Lambda での Amazon MSK イベントの消費。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord;

import java.util.Base64;
import java.util.Map;

public class Example implements RequestHandler<KafkaEvent, Void> {

    @Override
    public Void handleRequest(KafkaEvent event, Context context) {
        for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) {
            String key = entry.getKey();
            System.out.println("Key: " + key);

            for (KafkaEventRecord record : entry.getValue()) {
                System.out.println("Record: " + record);

                byte[] value = Base64.getDecoder().decode(record.getValue());
                String message = new String(value);
                System.out.println("Message: " + message);
            }
        }

        return null;
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
JavaScript を使用した Lambda での Amazon MSK イベントの消費。  

```
exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}
```
TypeScript を使用した Lambda での Amazon MSK イベントの消費。  

```
import { MSKEvent, Context } from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "msk-handler-sample",
});

export const handler = async (
  event: MSKEvent,
  context: Context
): Promise<void> => {
  for (const [topic, topicRecords] of Object.entries(event.records)) {
    logger.info(`Processing key: ${topic}`);

    // Process each record in the partition
    for (const record of topicRecords) {
      try {
        // Decode the message value from base64
        const decodedMessage = Buffer.from(record.value, 'base64').toString();

        logger.info({
          message: decodedMessage
        });
      }
      catch (error) {
        logger.error('Error processing event', { error });
        throw error;
      }
    };
  }
}
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
PHP を使用した Lambda での Amazon MSK イベントの消費。  

```
<?php
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kafka\KafkaEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): void
    {
        $kafkaEvent = new KafkaEvent($event);
        $this->logger->info("Processing records");
        $records = $kafkaEvent->getRecords();

        foreach ($records as $record) {
            try {
                $key = $record->getKey();
                $this->logger->info("Key: $key");

                $values = $record->getValue();
                $this->logger->info(json_encode($values));

                foreach ($values as $value) {
                    $this->logger->info("Value: $value");
                }
                
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python (Boto3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Python を使用した Lambda での Amazon MSK イベントの消費。  

```
import base64

def lambda_handler(event, context):
    # Iterate through keys
    for key in event['records']:
        print('Key:', key)
        # Iterate through records
        for record in event['records'][key]:
            print('Record:', record)
            # Decode base64
            msg = base64.b64decode(record['value']).decode('utf-8')
            print('Message:', msg)
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Ruby を使用した Lambda での Amazon MSK イベントの消費。  

```
require 'base64'

def lambda_handler(event:, context:)
  # Iterate through keys
  event['records'].each do |key, records|
    puts "Key: #{key}"

    # Iterate through records
    records.each do |record|
      puts "Record: #{record}"

      # Decode base64
      msg = Base64.decode64(record['value'])
      puts "Message: #{msg}"
    end
  end
end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用した Lambda での Amazon MSK イベントの消費。  

```
use aws_lambda_events::event::kafka::KafkaEvent;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use base64::prelude::*;
use serde_json::{Value};
use tracing::{info};

/// Pre-Requisites:
/// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html
/// 2. Add packages tracing, tracing-subscriber, serde_json, base64
///
/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/

async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> {

    let payload = event.payload.records;

    for (_name, records) in payload.iter() {

        for record in records {

         let record_text = record.value.as_ref().ok_or("Value is None")?;
         info!("Record: {}", &record_text);

         // perform Base64 decoding
         let record_bytes = BASE64_STANDARD.decode(record_text)?;
         let message = std::str::from_utf8(&record_bytes)?;
         
         info!("Message: {}", message);
        }

    }

    Ok(().into())
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // required to enable CloudWatch error logging by the runtime
    tracing::init_default_subscriber();
    info!("Setup CW subscriber!");

    run(service_fn(function_handler)).await
}
```

------

コンソールを使用して Lambda に関数コードを指定できます。

**コンソールのコードエディタを使用して関数コードを更新するには**

1. Lambda コンソールの「[関数ページ](https://console.aws.amazon.com/lambda/home#/functions)」を開き、関数を選択します。

1. **[コード]** タブを選択します。

1. **[コードソース]** ペインでソースコードファイルを選択し、統合コードエディタで編集します。

1. **[DEPLOY]** セクションで、**[デプロイ]** を選択して関数のコードを更新します。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## Lambda 関数をテストして、Amazon MSK トピックに接続されていることを確認します。
<a name="w2aad101c23c15c35c31"></a>

CloudWatch イベントログを調べることで、Lambda がイベントソースによって呼び出されているかどうかを確認できるようになりました。

**Lambda 関数が呼び出されているかどうかを確認するには**

1. Kafka 管理ホストを使用し、`kafka-console-producer` CLI を使用して Kafka イベントを生成します。詳細については、Kafka ドキュメントの「[Write some events into the topic](https://kafka.apache.org/documentation/#quickstart_send)」を参照してください。前のステップで定義したイベントソースマッピングのバッチサイズで定義されたバッチを埋めるのに十分なイベントを送信してください。そうしないと、Lambda は追加の情報が呼び出されるまで待機します。

1. 関数が実行されると、Lambda はその結果を CloudWatch に書き込みます。コンソールで、Lambda 関数の詳細ページに移動します。

1. [**Configuration (設定)**] タブを選択します。

1. サイドバーから、**[モニタリングおよび運用ツール]** を選択します。

1. **[ロギング設定]** で **[CloudWatch ロググループ]** を特定します。ロググループは `/aws/lambda` で始まります。ロググループへのリンクを選択します。

1. CloudWatch コンソールの **[ログイベント]** で、Lambda がログストリームに送信したログイベントがないかを調べます。次の図のように、Kafka イベントからのメッセージを含むログイベントがあるかどうかを確認します。存在する場合は、Lambda イベントソースマッピングを使用して Lambda 関数を Amazon MSK に正常に接続できています。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/msk_tut_log.png)