Lambda を使用した Amazon Kinesis Data Streams レコードの処理 - AWS Lambda

Lambda を使用した Amazon Kinesis Data Streams レコードの処理

Lambda を使用して Amazon Kinesis Data Streams レコードを処理するには、ストリーム用のコンシューマーを作成し、次に Lambda イベントソースマッピングを作成します。

データストリームと関数の設定

Lambda 関数は、データストリームのコンシューマーアプリケーションです。シャードごとに 1 つのレコードのバッチを一度に処理します。Lambda 関数を共有スループットコンシューマー (標準イテレーター) にマップすることも、拡張ファンアウトを使用する専用スループットコンシューマーにマップすることもできます。

  • 標準イテレーター: Lambda は、レコードの Kinesis ストリームにある各シャードを 1 秒あたり 1 回の基本レートでポーリングします。利用可能なレコードが増えると、Lambda は関数がストリームに追いつくまでバッチを処理し続けます。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。

  • 拡張ファンアウト: レイテンシーを最小限に抑え、読み取りスループットを最大化するには、拡張ファンアウトを使用してデータストリームコンシューマーを作成します。拡張ファンアウトを使用するコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。ストリームのコンシューマーは HTTP/2 を使用して、長時間にわたる接続とリクエストヘッダーの圧縮でレコードを Lambda にプッシュすることによってレイテンシーを短縮します。ストリームコンシューマーは、Kinesis RegisterStreamConsumer API を使用して作成できます。

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

次のような出力が表示されます。

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

関数がレコードを処理する速度を上げるには、データストリームにシャードを追加します。Lambda は、各シャードのレコードを順番に処理します。関数からエラーが返された場合、シャードのさらなるレコードの処理は停止されます。シャードが増えると、一度に処理されるバッチが増え、同時実行のエラーの影響を下げることができます。

同時実行のバッチの合計分を処理できるように関数をスケールアップできない場合は、関数のクォータ引き上げをリクエストするか、同時実行数を予約します。

Lambda 関数を呼び出すためのイベントソースマッピングを作成する

データストリームからのレコードを使用して Lambda 関数を呼び出すには、イベントソースマッピングを作成します。複数のイベントソースマッピングを作成することで、複数の Lambda 関数で同じデータを処理したり、1 つの関数で複数のデータストリームの項目を処理したりできます。複数のストリームから項目を処理する場合、各バッチには 1 つのシャードまたはストリームのレコードのみが含まれます。

別の AWS アカウント のストリームからのレコードを処理するようにイベント ソース マッピングを構成できます。詳細については、「クロスアカウントのイベントソースマッピングの作成」を参照してください。

イベントソースマッピングを作成する前に、Kinesis データストリームから読み取るためのアクセス許可を Lambda 関数に付与する必要があります。Lambda には、Kinesis データストリームに関連するリソースを管理するために次のアクセス許可が必要です。

AWS マネージドポリシー AWSLambdaKinesisExecutionRole には、これらのアクセス許可が含まれています。次の手順の説明に従って、この管理ポリシーを関数に追加します。

AWS Management Console
関数に Kinesis アクセス許可を追加するには
  1. Lambda コンソールの「関数ページ」を開き、関数を選択します。

  2. [構成] タブで、[アクセス許可] を選択します。

  3. [実行ロール] ペインの [ロール名] で、関数の実行ロールへのリンクを選択します。このリンクを選択すると、IAM コンソールでそのロールのページが開きます。

  4. [アクセス許可ポリシー] ペインで、[アクセス許可を追加] を選択し、[ポリシーをアタッチ] を続けて選択します。

  5. [検索] フィールドに AWSLambdaKinesisExecutionRole を入力します。

  6. ポリシーの名前の横にあるチェックボックスを選択し、[アクセス許可を追加] を選択します。

AWS CLI
関数に Kinesis アクセス許可を追加するには
  • 次の CLI コマンドを実行して、AWSLambdaKinesisExecutionRole ポリシーを関数の実行ロールに追加します。

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
関数に Kinesis アクセス許可を追加するには
  • 関数の定義で、次の例に示すように Policies プロパティを追加します。

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs20.x Policies: - AWSLambdaKinesisExecutionRole

必要なアクセス許可を設定した後、イベントソースマッピングを作成します。

AWS Management Console
Kinesis イベントソースマッピングを作成するには
  1. Lambda コンソールの「関数ページ」を開き、関数を選択します。

  2. [関数の概要] ペインで、[トリガーを追加] を選択します。

  3. [トリガー設定] で、ソースとして [Kinesis] を選択します。

  4. イベントソースマッピングを作成する Kinesis ストリームを選択し、オプションでストリームのコンシューマーを選択します。

  5. (オプション) イベントソースマッピングのバッチサイズ開始位置バッチウィンドウを編集します。

  6. 追加 を選択します。

コンソールからイベントソースマッピングを作成する場合は、IAM ロールには kinesis:ListStreams 権限と kinesis:ListStreamConsumers 権限が必要です。

AWS CLI
Kinesis イベントソースマッピングを作成するには
  • 次の CLI コマンドを実行して、Kinesis イベントソースマッピングを作成します。ユースケースに応じて、独自のバッチサイズと開始位置を選択します。

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

バッチ処理ウィンドウを指定するには、--maximum-batching-window-in-seconds オプションを追加します。このパラメータおよびその他のパラメータの使用の詳細については、「AWS CLI コマンドリファレンス」の「create-event-source-mapping」を参照してください。

AWS SAM
Kinesis イベントソースマッピングを作成するには
  • 関数の定義で、次の例に示すように KinesisEvent プロパティを追加します。

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs20.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

AWS SAM で Kinesis Data Streams のイベントソースマッピングを作成する方法の詳細については、「AWS Serverless Application Model デベロッパーガイド」の「Kinesis」を参照してください。

ポーリングとストリームの開始位置

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。

  • イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。

  • イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、LATEST をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON または AT_TIMESTAMP として指定します。

クロスアカウントのイベントソースマッピングの作成

Amazon Kinesis Data Streams は、リソースベースのポリシーをサポートします。このため、別のアカウントの Lambda 関数を使用して AWS アカウント のストリームに取り込まれたデータを処理できます。

別の AWS アカウント の Kinesis ストリームを使用して Lambda 関数のイベントソースマッピングを作成するには、リソースベースのポリシーを使用してストリームを設定し、Lambda 関数に項目を読み取るアクセス許可を付与する必要があります。クロスアカウントアクセスを許可するようにストリームを設定する方法については、「Amazon Kinesis Streams Developer guide」の「Sharing access with cross-account AWS Lambda functions」を参照してください。

Lambda 関数に必要なアクセス許可を付与するリソースベースのポリシーでストリームを設定したら、前のセクションで説明した方法のいずれかを使用してイベントソースマッピングを作成します。

Lambda コンソールでイベントソースマッピングを作成する場合は、ストリームの ARN を入力フィールドに直接貼り付けます。ストリームにコンシューマーを指定する場合、コンシューマーの ARN を貼り付けると、ストリームフィールドが自動的に入力されます。