Lambda での Amazon MSK メッセージの処理 - AWS Lambda

Lambda での Amazon MSK メッセージの処理

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。

Amazon MSK をイベントソースとして追加

イベントソースマッピングを作成するには、Amazon MSK を、Lambda コンソール、AWSSDK、またはAWS Command Line Interface(AWS CLI) を使用して Lambda 関数のトリガーとして追加します。Amazon MSK をトリガーとして追加すると、Lambda は Lambda 関数用の VPC 設定ではなく、Amazon MSK クラスター用の VPC 設定を引き受けることに注意してください。

このセクションでは、Lambda コンソールと AWS CLI を使用してイベントソースマッピングを作成する方法について説明します。

前提条件

  • Amazon MSK クラスターと Kafka トピック 詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドGetting Started Using Amazon MSK を参照してください。

  • MSK クラスターが使用する AWS リソースにアクセスするための許可を持つ実行ロール

カスタマイズ可能なコンシューマーグループ ID

Kafkaをイベントソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、Lambda 関数を結合したい Kafka コンシューマーグループの既存の識別子です。この機能を使用すると、実行中の Kafka レコード処理設定を他のコンシューマーから Lambda にシームレスに移行できます。

コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Kafka はすべてのコンシューマーにメッセージを配信します。言い換えると、Lambda は Kafka トピックのメッセージをすべて受け取るわけではありません。Lambda にトピック内のすべてのメッセージを処理させたい場合は、そのコンシューマーグループの他のポーラーをすべてオフにします。

さらに、コンシューマーグループ ID を指定し、Kafka が同じ ID を持つ有効な既存のコンシューマーグループを見つけた場合、Lambda は、イベントソースマッピングの StartingPosition パラメーターを無視します。代わりに、Lambda はコンシューマーグループのコミットされたオフセットに従ってレコードの処理を開始します。コンシューマーグループ ID を指定しても、Kafka が既存のコンシューマーグループを見つけられない場合、Lambda は指定された StartingPosition を使用してイベントソースを設定します。

指定するコンシューマーグループ ID は、すべての Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定して Kafka イベントソースマッピングを作成した後は、この値を更新することはできません。

Amazon MSK トリガーの追加 (コンソール)

Amazon MSK クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。

Amazon MSK トリガーをLambda 関数 (コンソール) に追加するには
  1. Lambda コンソールの [Functions (関数)] ページを開きます。

  2. Lambda 関数の名前を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. [Trigger configuration] (トリガー設定) で次の操作を実行します。

    1. MSKトリガータイプを選択します。

    2. [MSK cluster] (MSK クラスター) で、クラスターを選択します。

    3. [Batch size] (バッチサイズ) で、単一バッチで取得されるメッセージの最大数を設定します。

    4. バッチウィンドウ では、Lambda が関数を呼び出すまで費やすレコード収集の最大時間 (秒) を入力します。

    5. [Topic name] (トピック名)に、Kafka トピックの名前を入力します。

    6. (オプション) コンシューマーグループ ID で、参加する Kafka コンシューマーグループの ID を入力します。

    7. (オプション) [開始位置] で、[最新] を選択して最新のレコードからストリームの読み取りを開始するか、[水平トリム] を選択して使用可能な最も以前のレコードから開始するか、または [タイムスタンプ時点] を選択して読み取りを開始するタイムスタンプを指定します。

    8. (オプション) [Authentication] (認証) で、MSK クラスターのブローカーで認証するためのシークレットキーを選択します。

    9. テスト用に無効状態のトリガーを作成する (推奨) には、[Enable trigger] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[Enable trigger] (トリガーを有効にする) を選択します。

  5. トリガーを追加するには、[Add] (追加) を選択します。

Amazon MSK トリガーの追加 (AWS CLI)

Lambda 関数の Amazon MSK トリガーを作成および表示するには、次の例の AWS CLI コマンドを使用します。

AWS CLI を使用したトリガーの作成

例 — IAM 認証を使用するクラスターのイベントソースマッピングを作成します。

次の例では、create-event-source-mapping AWS CLI コマンドを使用して、my-kafka-function という名前の Lambda 関数を AWSKafkaTopic という名前の Kafka トピックにマッピングします。トピックの開始位置は LATEST に設定します。クラスターが IAM ロールベースの認証を使用する場合、SourceAccessConfiguration オブジェクトは必要ありません。例:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — SASL/SCRAM 認証を使用するクラスターのイベントソースマッピングを作成します。

クラスターが SASL/SCRAM 認証を使用する場合は、SASL_SCRAM_512_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
例 — mTLS 認証を使用するクラスターのイベントソースマッピングを作成します。

クラスターが mTLS 認証を使用する場合は、CLIENT_CERTIFICATE_TLS_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

詳細については、API リファレンスドキュメント CreateEventSourceMapping を参照してください。

AWS CLI を使用したステータスの表示

次の例では、get-event-source-mapping AWS CLI コマンドを使用して、作成したイベントソースマッピングのステータスを記述します。

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Amazon MSK 設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon MSK に適用されるのは一部のパラメータのみです。

[Parameter] (パラメータ) 必須 デフォルト メモ

AmazonManagedKafkaEventSourceConfig

N

ConsumerGroupID フィールドを含み、デフォルトでは一意の値になっています。

作成時にのみ設定可能

BatchSize

N

100

最大: 10,000

有効

N

有効

なし

EventSourceArn

Y

該当なし

作成時にのみ設定可能

FunctionName

Y

該当なし

なし

FilterCriteria

N

該当なし

Lambda が関数に送信するイベントを制御する

MaximumBatchingWindowInSeconds

N

500 ミリ秒

バッチ処理動作

SourceAccessConfigurations

N

認証情報なし

イベントソース用の、SASL/SCRAM あるいは CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) の認証情報

StartingPosition

Y

該当なし

AT_TIMESTAMP、TRIM_HORIZON、または LATEST

作成時にのみ設定可能

StartingPositionTimestamp

N

該当なし

StartingPosition が AT_TIMESTAMP に設定されている場合にのみ必要

トピック

Y

該当なし

カフカのトピック名

作成時にのみ設定可能

クロスアカウントのイベントソースマッピングの作成

マルチ VPC プライベート接続を使用して、Lambda 関数を別の AWS アカウントのプロビジョニングされた MSK クラスターに接続できます。マルチ VPC 接続は AWS PrivateLink を使用して、すべてのトラフィックを AWS ネットワーク内に保持します。

注記

サーバーレス MSK クラスターにはクロスアカウントイベントソースマッピングを作成できません。

クロスアカウントイベントソースマッピングを作成するには、まず MSK クラスターのマルチ VPC 接続を設定する必要があります。イベントソースマッピングを作成するときは、以下の例に示すように、クラスター ARN の代わりにマネージド VPC 接続 ARN を使用します。CreateEventSourceMapping オペレーションは、MSK クラスターが使用する認証タイプによっても異なります。

例 — IAM 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。

クラスターが IAM ロールベースの認証を使用する場合、SourceAccessConfiguration オブジェクトは必要ありません。例:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — SASL/SCRAM 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。

クラスターが SASL/SCRAM 認証を使用する場合は、SASL_SCRAM_512_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。

SASL/SCRAM 認証でクロスアカウントの Amazon MSK イベントソースマッピングにシークレットを使用する方法は 2 つあります。

  • Lambda 関数アカウントにシークレットを作成し、クラスターシークレットと同期します。2 つのシークレットを同期させるローテーションを作成します。このオプションでは、関数アカウントからシークレットを制御できます。

  • MSK クラスターに関連付けられているシークレットを使用してください。このシークレットは、Lambda 関数アカウントへのクロスアカウントアクセスを許可する必要があります。詳細については、「別のアカウントのユーザーの AWS Secrets Manager シークレットに対するアクセス許可」を参照してください。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
例 — mTLS 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。

クラスターが mTLS 認証を使用する場合は、CLIENT_CERTIFICATE_TLS_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。シークレットは、クラスターアカウントまたは Lambda 関数アカウントに保存できます。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

イベントソースとしての Amazon MSK クラスターの使用

Apache Kafka クラスターまたは Amazon MSK クラスターを Lambda 関数のトリガーとして追加すると、クラスターはイベントソースとして使用されます。

Lambda は、ユーザーが指定した StartingPosition に基づいて、CreateEventSourceMapping リクエストで Topics として指定された Kafka トピックからイベントデータを読み取ります。処理が成功すると、Kafka トピックは Kafka クラスターにコミットされます。

StartingPositionLATEST として指定すると、Lambda は、そのトピックに属する各パーティション内の最新のメッセージから読み取りを開始します。トリガーが設定されてから Lambda がメッセージの読み取りを開始するまでには若干の遅延が発生することがあるため、Lambda はこの期間中に生成されたメッセージを読み取りません。

Lambda は、Kafka トピックの各パーティションのメッセージを順番に読み込みます。1 つの Lambda ペイロードに、複数のパーティションからのメッセージを含めることができます。利用可能なレコードが増えると、Lambda は関数がトピックに追いつくまで、CreateEventSourceMapping リクエストで指定された BatchSize 値に基づいて、バッチ内のレコードの処理を継続します。

Lambda は各バッチを処理した後、そのバッチ内のメッセージのオフセットをコミットします。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。すべての再試行が失敗したレコードを、障害発生時の送信先に送信して、後で処理することができます。

注記

Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。

ポーリングとストリームの開始位置

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。

  • イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。

  • イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、LATEST をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON または AT_TIMESTAMP として指定します。

Amazon CloudWatch メトリクス

Lambda は、関数がレコードを処理している間に OffsetLag メトリクスを発行します。このメトリクスの値は、Kafka イベントソーストピックに書き込まれた最後のレコードと関数のコンシューマグループが処理した最後のレコードの間のオフセットの差分です。レコードが追加されてからコンシューマグループがそれを処理するまでのレイテンシーを見積もるには、OffsetLag を使用できます。

OffsetLag の増加傾向は、関数のコンシューマグループに問題があることを示している可能性があります。詳細については、「Lambda 関数のメトリクスを表示する」を参照してください。

Amazon MSK イベントソースの Auto Scaling

初めて Amazon MSK イベントソースを作成するときは、Lambda が Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、Lambda は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

Lambda は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは Lambda で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、Lambda はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットの Lambda 関数がスロットリングされると、Lambda はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。

Kafka トピックのスループットをモニタリングするには、関数がレコードを処理する間に Lambda が発行するオフセット遅延メトリクスを表示してください。

いくつの関数呼び出しが並行して発生しているかを確認するときは、関数の同時実行メトリクスも監視します。