Lambda を使用したセルフマネージド Apache Kafka メッセージの処理
注記
Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。
トピック
Kafka クラスターをイベントソースとして追加する
イベントソースマッピングを作成するには、Lambda コンソール、AWSSDK
このセクションでは、Lambda コンソールと AWS CLI を使用してイベントソースマッピングを作成する方法について説明します。
前提条件
-
セルフマネージド型 Apache Kafka クラスター。Lambda は、Apache Kafka バージョン 0.10.1.0 以降をサポートしています。
-
セルフマネージド Kafka クラスターが使用する AWS リソースにアクセスするための許可を持つ実行ロール。
カスタマイズ可能なコンシューマーグループ ID
Kafkaをイベントソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、Lambda 関数を結合したい Kafka コンシューマーグループの既存の識別子です。この機能を使用すると、実行中の Kafka レコード処理設定を他のコンシューマーから Lambda にシームレスに移行できます。
コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Kafka はすべてのコンシューマーにメッセージを配信します。言い換えると、Lambda は Kafka トピックのメッセージをすべて受け取るわけではありません。Lambda にトピック内のすべてのメッセージを処理させたい場合は、そのコンシューマーグループの他のポーラーをすべてオフにします。
さらに、コンシューマーグループ ID を指定し、Kafka が同じ ID を持つ有効な既存のコンシューマーグループを見つけた場合、Lambda は、イベントソースマッピングの StartingPosition
パラメーターを無視します。代わりに、Lambda はコンシューマーグループのコミットされたオフセットに従ってレコードの処理を開始します。コンシューマーグループ ID を指定しても、Kafka が既存のコンシューマーグループを見つけられない場合、Lambda は指定された StartingPosition
を使用してイベントソースを設定します。
指定するコンシューマーグループ ID は、すべての Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定して Kafka イベントソースマッピングを作成した後は、この値を更新することはできません。
セルフマネージド型 Kafka クラスターを追加する (コンソール)
セルフマネージド型 Apache Kafka クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。
Apache Kafka トリガーを Lambda 関数に追加するには (コンソール)
-
Lambda コンソールの [Functions] (関数) ページ
を開きます。 -
Lambda 関数の名前を選択します。
-
[機能の概要] で、[トリガーを追加] を選択します。
-
[Trigger configuration] (トリガー設定) で次の操作を実行します。
-
Apache Kafka のトリガータイプを選択します。
-
[Bootstrap servers] (ブートストラップサーバー) に、クラスター内の Kafka ブローカーのホストおよびポートのペアアドレスを入力し、[Add] (追加) を選択します。クラスター内の各 Kafka ブローカーで上記を繰り返します。
-
[Topic name] (トピック名) に、クラスター内のレコードの保存に使用する Kafka トピックの名前を入力します。
-
(オプション) [Batch size] (バッチサイズ) に、単一のバッチで取得できるメッセージの最大数を入力します。
-
バッチウィンドウ では、Lambda が関数を呼び出すまで費やすレコード収集の最大時間 (秒) を入力します。
-
(オプション) コンシューマーグループ ID で、参加する Kafka コンシューマーグループの ID を入力します。
-
(オプション) [開始位置] で、[最新] を選択して最新のレコードからストリームの読み取りを開始するか、[水平トリム] を選択して使用可能な最も以前のレコードから開始するか、または [タイムスタンプ時点] を選択して読み取りを開始するタイムスタンプを指定します。
-
(オプション) [VPC] で、Kafka クラスターに Amazon VPC を選択します。次に、[VPC subnets] (VPC サブネット) と [VPC security groups] (VPC セキュリティグループ) を選択します。
VPC 内のユーザーのみがブローカーにアクセスする場合、この設定は必須です。
-
(オプション) [Authentication] (認証) で [Add] (追加) をクリックしてから、以下を実行します。
-
クラスター内の Kafka ブローカーのアクセスまたは認証プロトコルを選択します。
-
Kafka ブローカーが SASL/PLAIN 認証を使用する場合は、[BASIC_AUTH] を選択します。
-
ブローカーが SALS/SCRAM 認証を使用する場合は、[SASL_SCRAM] プロトコルのいずれかを選択します。
-
mTLS 認証を設定している場合は、[CLIENT_CERTIFICATE_TLS_AUTH] プロトコルを選択します。
-
-
SASL/SCRAM または mTLS 認証の場合は、Kafka クラスターの認証情報が含まれる Secrets Manager シークレットキーを選択します。
-
-
(オプション) Kafka ブローカーがプライベート CA によって署名された証明書を使用する場合、[Encryption] (暗号化) には Kafka ブローカーが TLS 暗号化に使用するルート CA 証明書が含まれる Secrets Manager シークレットを選択します。
この設定は、SASL/SCRAM または SASL/PLAIN の TLS 暗号化、および mTLS 認証に適用されます。
-
テスト用に無効状態のトリガーを作成する (推奨) には、[Enable trigger] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[Enable trigger] (トリガーを有効にする) を選択します。
-
-
トリガーを追加するには、[Add] (追加) を選択します。
セルフマネージド型 Kafka クラスターを追加する (AWS CLI)
Lambda 関数のセルフマネージド型 Apache Kafka トリガーを作成および表示するには、次の AWS CLI コマンドの例を使用します。
SASL/SCRAM を使用する
Kafka ユーザーがインターネット経由で Kafka ブローカーにアクセスする場合は、SASL/SCRAM 認証用に作成した Secrets Manager シークレットを指定します。次の例では、create-event-source-mappingmy-kafka-function
という名前の Lambda 関数を AWSKafkaTopic
という名前の Kafka トピックにマッピングします。
aws lambda create-event-source-mapping \ --topics
AWSKafkaTopic
\ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333
:secret:MyBrokerSecretName
\ --function-name arn:aws:lambda:us-east-1:111122223333
:function:my-kafka-function
\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
VPC の使用
Kafka ブローカーにアクセスするのが VPC 内の Kafka ユーザーのみである場合、VPC、サブネット、および VPC セキュリティグループを指定する必要があります。次の例では、create-event-source-mappingmy-kafka-function
という名前の Lambda 関数を AWSKafkaTopic
という名前の Kafka トピックにマッピングします。
aws lambda create-event-source-mapping \ --topics
AWSKafkaTopic
\ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333
:function:my-kafka-function
\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
AWS CLI を使用したステータスの表示
次の例では、get-event-source-mapping
aws lambda get-event-source-mapping --uuid
dh38738e-992b-343a-1077-3478934hjkfd7
セルフマネージド Apache Kafka の設定パラメータ
すべての Lambda イベントソースタイプは、同じ CreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Apache Kafka に適用されるのは一部のパラメータのみです。
[Parameter] (パラメータ) | 必須 | デフォルト | メモ |
---|---|---|---|
BatchSize |
N |
100 |
最大: 10,000 |
DestinationConfig |
N |
該当なし |
|
有効 |
N |
True |
|
FilterCriteria |
N |
該当なし |
|
FunctionName |
Y |
該当なし |
|
KMSKeyArn |
N |
該当なし |
|
MaximumBatchingWindowInSeconds |
N |
500 ミリ秒 |
|
ProvisionedPollersConfig |
N |
|
|
SelfManagedEventSource |
Y |
該当なし |
Kafka ブローカー一覧 作成時にのみ設定可能 |
SelfManagedKafkaEventSourceConfig |
N |
ConsumerGroupID フィールドを含み、デフォルトでは一意の値になっています。 |
作成時にのみ設定可能 |
SourceAccessConfigurations |
N |
認証情報なし |
クラスターの VPC 情報または認証情報 SASL_PLAIN は、BASIC_AUTH に設定 |
StartingPosition |
Y |
該当なし |
AT_TIMESTAMP、TRIM_HORIZON、または LATEST 作成時にのみ設定可能 |
StartingPositionTimestamp |
N |
該当なし |
StartingPosition が AT_TIMESTAMP に設定されている場合にのみ必要 |
タグ |
N |
該当なし |
|
トピック |
Y |
該当なし |
トピック名 作成時にのみ設定可能 |
Kafka クラスターをイベントソースとして使用する
Apache Kafka クラスターまたは Amazon MSK クラスターを Lambda 関数のトリガーとして追加すると、クラスターはイベントソースとして使用されます。
Lambda は、ユーザーが指定した StartingPosition
に基づいて、CreateEventSourceMapping リクエストで Topics
として指定された Kafka トピックからイベントデータを読み取ります。処理が成功すると、Kafka トピックは Kafka クラスターにコミットされます。
StartingPosition
を LATEST
として指定すると、Lambda は、そのトピックに属する各パーティション内の最新のメッセージから読み取りを開始します。トリガーが設定されてから Lambda がメッセージの読み取りを開始するまでには若干の遅延が発生することがあるため、Lambda はこの期間中に生成されたメッセージを読み取りません。
Lambda は、指定された 1 つ、または複数の Kafka トピックパーティションからのレコードを処理し、関数に JSON ペイロードを送信します。1 つの Lambda ペイロードに、複数のパーティションからのメッセージを含めることができます。利用可能なレコードが増えると、Lambda は関数がトピックに追いつくまで、CreateEventSourceMapping リクエストで指定された BatchSize
値に基づいて、バッチ内のレコードの処理を継続します。
関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。すべての再試行が失敗したレコードを、障害発生時の送信先に送信して、後で処理することができます。
注記
Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。
ポーリングとストリームの開始位置
イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。
-
イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。
-
イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。
つまり、LATEST
をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON
または AT_TIMESTAMP
として指定します。
セルフマネージド Apache Kafka イベントソースマッピングのメッセージスループットのスケーリング動作
Amazon MSK イベントソースマッピングでは、メッセージスループットのスケーリング動作を次の 2 つのモードから選択できます。
デフォルト (オンデマンド) モード
初めてセルフマネージド Apache Kafka イベントソースを作成すると、Lambda は Kafka トピック内のすべてのパーティションを処理するために、デフォルトの数のイベントポーラーを割り当てます。Lambda は、メッセージ負荷に基づいて、イベントポーラーの数を自動的にスケールアップまたはスケールダウンします。
Lambda は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。オフセットラグが大きすぎる場合、パーティションは Lambda で処理できる速度よりも速くメッセージを受信します。必要に応じて、Lambda はトピックのイベントポーラーを追加または削除します。イベントポーラーを追加または削除するこの自動スケーリングプロセスは、評価から 3 分以内に実行されます。
ターゲットの Lambda 関数がスロットリングされると、Lambda はイベントポーラーの数を減らします。このアクションにより、イベントポーラーが関数から取得するメッセージ、および関数に送信するメッセージの数が減ることで、関数に対する負荷が軽減されます。
Kafka トピックのスループットを監視するには、consumer_lag
や consumer_offset
などの Apache Kafka コンシューマーラグメトリックスを表示します。
プロビジョンドモードの設定
イベントソースマッピングのスループットを微調整する必要があるワークロードでは、プロビジョンドモードを使用できます。プロビジョンドモードでは、プロビジョニングされたイベントポーラーの最小数と最大数を定義します。これらのプロビジョニングされたイベントポーラーは、イベントソースマッピング専用であり、予期しないメッセージスパイクが発生した場合に、それらを瞬時に処理できます。パフォーマンス要件が厳しい Kafka ワークロードには、プロビジョンドモードを使用することをお勧めします。
Lambda のイベントポーラーは最大 5 MBps のスループットを処理できるコンピューティングユニットです。例えば、イベントソースの平均ペイロードが 1 MB で、平均関数所要時間が 1 秒であるとします。ペイロードで変換 (フィルタリングなど) が発生しない場合、1 つのポーラーが 5 MBps のスループットと 5 つの同時 Lambda 呼び出しをサポートできます。プロビジョニングモードを使用すると、追加コストが発生します。料金の見積もりについては、「AWS Lambda 料金
プロビジョンドモードでは、イベントポーラーの最小数 (MinimumPollers
) の許容値の範囲は 1~200 です。イベントポーラーの最大数 (MaximumPollers
) の許容値の範囲は 1~2,000 です。MaximumPollers
は MinimumPollers
以上である必要があります。また、パーティション内での順序付き処理を維持するために、Lambda は MaximumPollers
をトピック内のパーティション数に制限します。
イベントポーラーの最小数および最大数に適切な値を選択する方の詳細については、「プロビジョンドモードを使用する際のベストプラクティスと考慮事項」を参照してください。
コンソールまたは Lambda API を使用して、セルフマネージド Apache Kafka イベントソースマッピングに対するプロビジョンドモードを設定できます。
既存のセルフマネージド Apache Kafka イベントソースマッピングに対してプロビジョンドモードを設定する手順 (コンソール)
-
Lambda コンソールの [関数ページ]
を開きます。 -
プロビジョンドモードを設定するセルフマネージド Apache Kafka イベントソースマッピングを持つ関数を選択します。
-
[設定] タブを選択し、[トリガー] を選択します。
-
プロビジョンドモードを設定するセルフマネージド Apache Kafka イベントソースマッピングを選択し、[編集] を選択します。
-
[イベントソースマッピング設定] で、[プロビジョンドモードの設定] を選択します。
-
[最小イベントポーラー数] に、1~200 の値を入力します。値を指定しない場合、Lambda はデフォルト値 1 を選択します。
-
[最大イベントポーラー数] に、1~2,000 の値を入力します。この値は、[最小イベントポーラー数] の値以上である必要があります。値を指定しない場合、Lambda はデフォルト値 200 を選択します。
-
-
[Save] を選択します。
EventSourceMappingConfiguration の ProvisionedPollerConfig オブジェクトを使用して、プログラムでプロビジョンドモードを設定できます。例えば、次の UpdateEventSourceMapping CLI コマンドは、MinimumPollers
値を 5、MaximumPollers
値を 100 に設定します。
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'
プロビジョニングモードを設定すると、ProvisionedPollers
メトリクスをモニタリングすることで、ワークロードのイベントポーラーの使用状況を確認できます。詳細については、「イベントソースマッピングメトリクス」を参照してください。
プロビジョンドモードを無効にしてデフォルト (オンデマンド) モードに戻すには、次の UpdateEventSourceMapping CLI コマンドを使用できます。
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{}'
プロビジョンドモードを使用する際のベストプラクティスと考慮事項
イベントソースマッピングの最小および最大イベントポーラー数の最適な設定は、アプリケーションのパフォーマンス要件によって異なります。パフォーマンスプロファイルのベースラインを確立する際には、最初はデフォルトの最小イベントポーラー数から始めることをお勧めします。観測されたメッセージ処理パターンと望ましいパフォーマンスプロファイルに基づいて設定を調整します。
トラフィックの急増が発生しやすく、厳格なパフォーマンス要件があるワークロードの場合、メッセージの突然の急増を処理できるように、最小イベントポーラーを増やします。必要な最小イベントポーラー数を決定するには、ワークロードの 1 秒あたりのメッセージ数と平均ペイロードサイズを考慮し、1 つのイベントポーラーのスループットキャパシティ (最大 5 MBps) を参考にします。
パーティション内での順序付き処理を維持するために、Lambda は最大イベントポーラー数をトピック内のパーティション数に制限します。さらに、イベントソースマッピングがスケーリングできる最大イベントポーラー数は、関数の同時実行設定によって異なります。
プロビジョンドモードを有効にする際には、ネットワーク設定を更新して AWS PrivateLink VPC エンドポイントと関連するアクセス許可を削除します。
Amazon CloudWatch メトリクス
Lambda は、関数がレコードを処理している間に OffsetLag
メトリクスを発行します。このメトリクスの値は、Kafka イベントソーストピックに書き込まれた最後のレコードと関数のコンシューマグループが処理した最後のレコードの間のオフセットの差分です。レコードが追加されてからコンシューマグループがそれを処理するまでのレイテンシーを見積もるには、OffsetLag
を使用できます。
OffsetLag
の増加傾向は、関数のコンシューマグループに問題があることを示している可能性があります。詳細については、「Lambda での CloudWatch メトリクスの使用」を参照してください。