Lambda を使用した Amazon DocumentDB イベントの処理 - AWS Lambda

Lambda を使用した Amazon DocumentDB イベントの処理

Amazon DocumentDB クラスターをイベントソースとして設定することにより、Lambda 関数を使用して Amazon DocumentDB (MongoDB 互換) 変更ストリームでイベントを処理できます。その後、Amazon DocumentDB クラスターでデータが変更されるたびに Lambda 関数を呼び出すことで、イベント駆動型のワークロードを自動化できます。

注記

Lambda では Amazon DocumentDB のバージョン 4.0 および 5.0 のみがサポートされています。バージョン 3.6 はサポートされていません。

また、イベントソースマッピングでは、Lambda はインスタンスベースのクラスターとリージョンレベルのクラスターのみをサポートします。Lambda は、Elastic クラスターまたはグローバルクラスターをサポートしていません。この制限は、Lambda を Amazon DocumentDB に接続するクライアントとして使用する場合には適用されません。Lambda はすべてのクラスタータイプに接続して CRUD 操作を実行できます。

Lambda は、Amazon DocumentDB 変更ストリームからのイベントを、到着した順序に沿って処理します。このため、関数は Amazon DocumentDB からの同時呼び出しを一度に 1 つしか処理できません。関数を監視するには、その同時実行メトリクスを追跡できます。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、 AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

Amazon DocumentDB イベントの例

{ "eventSourceArn": "arn:aws:rds:us-east-1:123456789012:cluster:canaryclusterb2a659a2-qo5tcmqkcl03", "events": [ { "event": { "_id": { "_data": "0163eeb6e7000000090100000009000041e1" }, "clusterTime": { "$timestamp": { "t": 1676588775, "i": 9 } }, "documentKey": { "_id": { "$oid": "63eeb6e7d418cd98afb1c1d7" } }, "fullDocument": { "_id": { "$oid": "63eeb6e7d418cd98afb1c1d7" }, "anyField": "sampleValue" }, "ns": { "db": "test_database", "coll": "test_collection" }, "operationType": "insert" } } ], "eventSource": "aws:docdb" }

この例のイベントとその形状の詳細については、MongoDB ドキュメントウェブサイトの「変更イベント」を参照してください。

前提条件とアクセス許可

Amazon DocumentDB を Lambda 関数のイベントソースとして使用する前に、次の前提条件に注意してください。必要なもの:

  • 関数と同じ AWS アカウント と AWS リージョン に既存の Amazon DocumentDB クラスターが必要です。既存のクラスターがない場合は、Amazon DocumentDB デベロッパーガイドの「Amazon DocumentDB の開始方法」のステップに従って作成できます。または、「チュートリアル: Amazon DocumentDB を用いて AWS Lambda のストリームの使用」の最初の一連のステップに従って、必要な前提条件をすべて備えた Amazon DocumentDB クラスターを作成する方法もあります。

  • Lambda が、Amazon DocumentDB クラスターに関連付けられている Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできるようにします。詳細については、「ネットワークセキュリティを設定する」を参照してください。

  • Amazon DocumentDB クラスターで TLS を有効にします。これはデフォルトの設定です。TLS を無効にすると、Lambda はクラスターと通信できません。

  • Amazon DocumentDB クラスターで変更ストリームをアクティブ化します。詳細については、「Amazon DocumentDB デベロッパーガイド」の「Amazon DocumentDB で変更ストリームを使用する」を参照してください。

  • Amazon DocumentDB クラスターにアクセスするための認証情報を Lambda に提供します。イベントソースを設定するときは、クラスターへのアクセスに必要な認証の詳細 (ユーザー名とパスワード) を含む AWS Secrets Manager キーを指定します。セットアップ中にこのキーを指定するには、次のいずれかを行います。

    • セットアップに Lambda コンソールを使用している場合は、[Secrets Manager キー] フィールドでこのキーを指定します。

    • セットアップに AWS Command Line Interface (AWS CLI) を使用している場合は、source-access-configurations オプションでこのキーを指定します。このオプションは、create-event-source-mapping コマンドまたは update-event-source-mapping コマンドのどちらにも含めることができます。例:

      aws lambda create-event-source-mapping \ ... --source-access-configurations '[{"Type":"BASIC_AUTH","URI":"arn:aws:secretsmanager:us-west-2:123456789012:secret:DocDBSecret-AbC4E6"}]' \ ...
  • Amazon DocumentDB ストリームに関連するリソースを管理するには、Lambda に許可を付与します。関数の実行ロールに次の許可を手動で追加します。

  • Lambda に送信する Amazon DocumentDB 変更ストリームイベントのサイズは 6 MB 未満にしてください。Lambda は最大 6 MB のペイロードサイズをサポートします。変更ストリームが 6 MB を超えるイベントを Lambda に送信しようとすると、Lambda はメッセージを削除して OversizedRecordCount メトリクスを発行します。Lambda は、ベストエフォートベースですべてのメトリクスを発行します。

注記

Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。

ネットワークセキュリティを設定する

イベントソースマッピングを通じて Lambda に Amazon DocumentDB へのフルアクセスを許可するには、クラスターがパブリックエンドポイント (パブリック IP アドレス) を使用するか、クラスターを作成した Amazon VPC へのアクセスを提供する必要があります。

Lambda で Amazon DocumentDB を使用する場合は、関数に Amazon VPC 内のリソースへのアクセスを付与する AWS PrivateLink VPC エンドポイントを作成します。

注記

イベントポーラーにデフォルト (オンデマンド) モードを使用するイベントソースマッピングを持つ関数には、AWS PrivateLink VPC エンドポイントが必要です。イベントソースマッピングがプロビジョンドモードを使用している場合は、AWS PrivateLink VPC エンドポイントを設定する必要はありません。

エンドポイントを作成して、次のリソースへのアクセスを提供します。

  • Lambda — Lambda サービスプリンシパルのエンドポイントを作成します。

  • AWS STS — サービスプリンシパルがユーザーに代わってロールを引き受けるために、AWS STS のエンドポイントを作成します。

  • Secrets Manager — クラスターが Secrets Manager を使用して認証情報を保存する場合は、Secrets Manager のエンドポイントを作成します。

または、Amazon VPC の各パブリックサブネットに NAT ゲートウェイを設定します。詳細については、「VPC に接続された Lambda 関数にインターネットアクセスを有効にする」を参照してください。

Amazon DocumentDB のイベントソースマッピングを作成するとき、Lambda は Amazon VPC 用に設定されたサブネットおよびセキュリティグループに Elastic Network Interface (ENI) が既に存在するかどうかを確認します。Lambda が既存の ENI を検出した場合、再利用しようとします。それ以外の場合、Lambda は新しい ENI を作成し、イベントソースに接続して関数を呼び出します。

注記

Lambda 関数は、Lambda サービスが所有する VPC 内で常に実行されます。関数の VPC 設定はイベントソースマッピングに影響しません。Lambda がイベントソースに接続する方法を判定するのは、イベントソースのネットワーク設定のみです。

クラスターを含む Amazon VPC のセキュリティグループを設定します。デフォルトでは、Amazon DocumentDB はポート: 27017 を使用します。

  • インバウンドルール – イベントソースに関連付けられたセキュリティグループのデフォルトのクラスターポート上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、443 番ポート上のすべてのトラフィックを許可します。イベントソースに関連付けられたセキュリティグループのデフォルトのクラスターポート上のすべてのトラフィックを許可します。

  • Amazon VPC エンドポイントのインバウンドルール – Amazon VPC エンドポイントを使用している場合、Amazon VPC エンドポイントに関連付けられたセキュリティグループは、クラスターセキュリティグループからポート 443 でインバウンドトラフィックを許可する必要があります。

クラスターが認証を使用する場合、Secrets Manager エンドポイントのエンドポイントポリシーを制限することもできます。Secrets Manager API を呼び出す場合、Lambda は Lambda サービスプリンシパルではなく、関数ロールを使用します。

例 VPC エンドポイントポリシー – Secrets Manager エンドポイント
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "arn:aws::iam::123456789012:role/my-role" ] }, "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret" } ] }

Amazon VPC エンドポイントを使用する場合、AWS は API コールをルーティングし、エンドポイントの Elastic Network Interface (ENI) を使用して関数を呼び出します。Lambda サービスプリンシパルは、これらの ENI を使用するすべてのロールおよび関数に対して lambda:InvokeFunction を呼び出す必要があります。

デフォルトでは、Amazon VPC エンドポイントには、リソースへの広範なアクセスを許可するオープンな IAM ポリシーが適用されています。そのエンドポイントを使用して必要なアクションを実行するためのベストプラクティスは、これらのポリシーを制限することです。イベントソースマッピングが Lambda 関数を呼び出せるようにするには、VPC エンドポイントポリシーで、Lambda サービスプリンシパルが sts:AssumeRole および lambda:InvokeFunction を呼び出すことを許可する必要があります。組織内で発生する API コールのみを許可するように VPC エンドポイントポリシーを制限すると、イベントソースマッピングが正しく機能しなくなるため、これらのポリシーには "Resource": "*" が必要です。

次の VPC エンドポイントポリシーの例では、AWS STS および Lambda エンドポイントに Lambda サービスプリンシパルの必要なアクセスを付与する方法について示しています。

例 VPC エンドポイントポリシー - AWS STS エンドポイント
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
例 VPC エンドポイントポリシー – Lambda エンドポイント
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

Amazon DocumentDB イベントソースマッピングを作成する (コンソール)

Amazon DocumentDB クラスターの変更ストリームから読み取る Lambda 関数に対して、イベントソースマッピングを作成します。このセクションでは、Lambda コンソールからこれを実行する方法を説明します。AWSSDK と AWS CLI 手順については、「Amazon DocumentDB イベントソースマッピングを作成する (SDK または CLI)」を参照してください。

Amazon DocumentDB イベントソースマッピングを作成するには (コンソール)
  1. Lambda コンソールの関数ページを開きます。

  2. 関数の名前を選択します。

  3. [関数の概要] で [トリガーを追加] をクリックします。

  4. [トリガーの設定] のドロップダウンリストから [DocumentDB] を選択します。

  5. 必要なオプションを設定し、[追加] を選択します。

Lambda は、Amazon DocumentDB イベントソースの以下のオプションをサポートしています。

  • [DocumentDB クラスター] – Amazon DocumentDB クラスターを選択します。

  • [トリガーをアクティブ化] – トリガーを今すぐアクティブ化するかどうかを選択します。このチェックボックスをオンにすると、関数は、イベントソースマッピングの作成時に、指定された Amazon DocumentDB 変更ストリームからのトラフィックの受信を直ちに開始します。チェックボックスをオフにして、テストのために非アクティブ化された状態でイベントソースマッピングを作成することを推奨します。作成後、いつでもイベントソースマッピングをアクティブ化できます。

  • [データベース名] – 使用するクラスター内のデータベースの名前を入力します。

  • (オプション) [コレクション名] – 使用するデータベース内のコレクションの名前を入力します。コレクションを指定しない場合、Lambda はデータベース内の各コレクションのすべてのイベントをリッスンします。

  • [バッチサイズ] - 単一のバッチで取得するメッセージの最大数 (最大 10,000 件) を設定します。デフォルトバッチサイズは 100 です。

  • [開始位置] – レコードの読み取りを開始するストリーム内の位置を選択します。

    • [最新] - ストリームに追加された新しいレコードのみを処理します。関数は、Lambda がイベントソースの作成を完了した後にのみ、レコードの処理を開始します。これは、イベントソースが正常に作成されるまで、一部のレコードが削除される可能性があることを意味します。

    • [水平トリム] - ストリーム内のすべてのレコードを処理します。Lambda は、クラスターのログ保持期間を使用して、イベントの読み取りを開始する場所を決定します。具体的には、Lambda は current_time - log_retention_duration から読み取りを開始します。Lambda がすべてのイベントを読み取るには、このタイムスタンプの前に変更ストリームが既にアクティブになっている必要があります。

    • [タイムスタンプ] - 特定の時刻以降のレコードを処理します。Lambda がすべてのイベントを適切に読み取るには、指定されたタイムスタンプの前に変更ストリームが既にアクティブになっている必要があります。

  • [認証] – クラスター内のブローカーにアクセスするための認証方法を選択します。

    • [BASIC_AUTH] – 基本認証では、クラスターにアクセスするための認証情報を含む Secrets Manager キーを指定する必要があります。

  • [Secrets Manager キー] – Amazon DocumentDB クラスターへのアクセスに必要な認証の詳細 (ユーザー名とパスワード) を含む Secrets Manager キーを選択します。

  • (オプション) [バッチウィンドウ] - 関数を呼び出す前にレコードを収集する最大時間を 300 までの秒数で設定します。

  • (オプション) [ドキュメントの完全な設定] – ドキュメントの更新オペレーションでは、ストリームに送信するものを選択します。デフォルト値は Default です。これは、各変更ストリームイベントについて、行われた変更について記述するデルタのみを Amazon DocumentDB が送信することを意味します。このフィールドの詳細については、MongoDB Javadocs の API ドキュメントの「FullDocument」を参照してください。

    • [デフォルト] – Lambda は、行われた変更について記述する部分的なドキュメントのみを送信します。

    • [UpdateLookup] – Lambda は、ドキュメント全体のコピーとともに、変更について記述するデルタを送信します。

Amazon DocumentDB イベントソースマッピングを作成する (SDK または CLI)

Amazon DocumentDB イベントソースマッピングを AWS SDK を使用して作成または管理するには、次の API オペレーションを使用します。

AWS CLI を使用してイベントソースマッピングを作成するには、create-event-source-mapping コマンドを使用します。以下の例では、このコマンドを使用して、my-function という名前の関数を Amazon DocumentDB 変更ストリームにマッピングします。イベントソースは Amazon リソースネーム (ARN) によって指定され、バッチサイズ 500 で、Unix 時間形式のタイムスタンプから始まります。このコマンドでは、Lambda が Amazon DocumentDB への接続に使用する Secrets Manager キーも指定します。さらに、データベースと読み取り元のコレクションを指定する document-db-event-source-config パラメーターも含まれています。

aws lambda create-event-source-mapping --function-name my-function \ --event-source-arn arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy --batch-size 500 \ --starting-position AT_TIMESTAMP \ --starting-position-timestamp 1541139109 \ --source-access-configurations '[{"Type":"BASIC_AUTH","URI":"arn:aws:secretsmanager:us-east-1:123456789012:secret:DocDBSecret-BAtjxi"}]' \ --document-db-event-source-config '{"DatabaseName":"test_database", "CollectionName": "test_collection"}' \

次のような出力が表示されます。

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "DocumentDBEventSourceConfig": { "CollectionName": "test_collection", "DatabaseName": "test_database", "FullDocument": "Default" }, "MaximumBatchingWindowInSeconds": 0, "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy", "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function", "LastModified": 1541348195.412, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action" }

作成後、update-event-source-mapping コマンドを使用して、Amazon DocumentDB イベントソースに関連する設定を更新できます。次の例では、バッチサイズを 1,000 に更新し、バッチウィンドウを 10 秒に更新します。このコマンドには、list-event-source-mapping コマンドまたは Lambda コンソールから取得できるイベントソースマッピングの UUID が必要です。

aws lambda update-event-source-mapping --function-name my-function \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --batch-size 1000 \ --batch-window 10

このような出力が表示されます。

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "DocumentDBEventSourceConfig": { "CollectionName": "test_collection", "DatabaseName": "test_database", "FullDocument": "Default" }, "MaximumBatchingWindowInSeconds": 0, "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy", "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function", "LastModified": 1541359182.919, "LastProcessingResult": "OK", "State": "Updating", "StateTransitionReason": "User action" }

Lambda は設定を非同期的に更新するため、プロセスが完了するまでこれらの変更が出力に表示されない場合があります。イベントソースマッピングの現在の設定を表示するには、get-event-source-mapping コマンドを使用します。

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

このような出力が表示されます。

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "DocumentDBEventSourceConfig": { "CollectionName": "test_collection", "DatabaseName": "test_database", "FullDocument": "Default" }, "BatchSize": 1000, "MaximumBatchingWindowInSeconds": 10, "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy", "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function", "LastModified": 1541359182.919, "LastProcessingResult": "OK", "State": "Enabled", "StateTransitionReason": "User action" }

Amazon DocumentDB イベントソースマッピングを削除するには、delete-event-source-mapping コマンドを使用します。

aws lambda delete-event-source-mapping \ --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284

ポーリングとストリームの開始位置

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。

  • イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。

  • イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、LATEST をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON または AT_TIMESTAMP として指定します。

Amazon DocumentDB イベントソースのモニタリング

Amazon DocumentDB イベントソースのモニタリングに役立つよう、関数がレコードのバッチの処理を完了したときに、Lambda は IteratorAge メトリクスを発行します。イテレータの有効期間とは、最新のイベントのタイムスタンプと現在のタイムスタンプの差のことです。基本的に、IteratorAge メトリクスは、バッチで最後に処理されたレコードの古さを示します。関数が新しいイベントを現在処理している場合、イテレーターの有効期間を使用して、レコードが追加されてから関数によって処理されるまでのレイテンシーを推定できます。IteratorAge の増加傾向は、関数に問題があることを示している可能性があります。詳細については、「Lambda での CloudWatch メトリクスの使用」を参照してください。

Amazon DocumentDB の変更ストリームは、イベント間の大きなタイムギャップを処理するようには最適化されていません。Amazon DocumentDB イベントソースが長期間にイベントを受信しない場合、Lambda はイベントソースマッピングを無効にすることがあります。この期間の長さは、クラスターのサイズやその他のワークロードに応じて、数週間から数か月までさまざまです。

Lambda は最大 6 MB のペイロードをサポートします。ただし、Amazon DocumentDB 変更ストリームイベントのサイズは最大 16 MB です。変更ストリームが 6 MB を超える変更ストリームイベントを Lambda に送信しようとすると、Lambda はメッセージを削除して OversizedRecordCount メトリクスを発行します。Lambda は、ベストエフォートベースですべてのメトリクスを発行します。