出力としての Lambda 関数の使用 - Amazon Kinesis Data Analytics for SQL Applications デベロッパーガイド

慎重に検討した結果、2 つのステップでSQLアプリケーションの Amazon Kinesis Data Analytics を中止することにしました。

1. 2025 年 10 月 15 日以降、SQLアプリケーション用の新しい Kinesis Data Analytics を作成することはできません。

2. 2026 年 1 月 27 日以降、アプリケーションは削除されます。SQL アプリケーションの Amazon Kinesis Data Analytics を起動または操作することはできません。SQL それ以降、Amazon Kinesis Data Analytics のサポートは利用できなくなります。詳細については、「Amazon Kinesis Data Analytics for SQL Applications の停止」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

出力としての Lambda 関数の使用

AWS Lambda を宛先として使用すると、最終的な宛先への送信に先立つ SQL 結果の後処理がより簡単になります。一般的な後処理タスクには次のものがあります。

  • 複数の行を 1 つのレコードに集約する

  • 現在の結果と過去の結果を組み合わせて、遅れて届くデータに対処する

  • 情報のタイプに基づいて異なる送信先に配信する

  • レコード形式の変換 (Protobuf への変換など)

  • 文字列操作または変換

  • 分析処理後のデータの強化

  • 地理空間ユースケースのカスタム処理

  • データ暗号化

Lambda 関数は、次のようなさまざまな AWS サービスおよび他の宛先に分析情報を提供できます。

Lambda アプリケーションの作成の詳細については、「AWS Lambda のご利用開始にあたって」を参照してください。

出力許可としての Lambda

出力として Lambda を使用するには、アプリケーションの Lambda 出力 IAM ロールに次のアクセス許可ポリシーが必要です。

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "FunctionARN" }

出力メトリクスとしての Lambda

Amazon CloudWatch を使用して、送信バイト数、成功および失敗などをモニタリングします。出力として Lambda を使用する Kinesis Data Analytics によって出力される CloudWatch メトリクスについては、「Amazon Kinesis Analytics Metrics」を参照してください。

出力イベント入力データモデルおよびレコードレスポンスモデルとしての Lambda

Kinesis Data Analytics 出力レコードを送信する場合、Lambda 関数は、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。

イベント入力データモデル

Kinesis Data Analytics は、次のリクエストモデルの出力関数として、アプリケーションから Lambda に出力レコードを継続的に送信します。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、出力要件 (最終的な送信先に送信する前のデータ変換など) を実行します。

フィールド 説明
invocationId Lambda 呼び出し ID (ランダム GUID)。
applicationArn Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN)。
レコード
フィールド 説明
recordId レコード ID (ランダム GUID)
lambdaDeliveryRecordMetadata
フィールド 説明
retryHint 配信再試行回数
データ Base64 でエンコードされた出力レコードのペイロード
注記

retryHint は配信失敗ごとに増加する値です。この値は永続的に保持されず、アプリケーションが中断された場合にリセットされます。

レコードレスポンスモデル

出力関数として Ok に (レコード ID と共に) 送信される各レコードは、 または DeliveryFailed のどちらかで確認される必要があり、次のパラメータを含める必要があります。それ以外の場合、Kinesis Data Analytics はそれらを配信失敗として扱います。

レコード
フィールド 説明
recordId レコード ID は呼び出し時に Kinesis Data Analytics から Lambda に渡されます。元のレコードの ID と確認されたレコードの ID との不一致は、配信失敗として扱われます。
result レコード配信のステータス。以下の値を指定できます。
  • Ok: レコードは正常に変換され、最終的な送信先に送信されました。Kinesis Data Analytics は SQL 処理のレコードを取り込みます。

  • DeliveryFailed: レコードは Lambda によって出力関数として最終的な送信先に正常に配信されませんでした。Kinesis Data Analytics は失敗したレコードの出力関数としての Lambda への送信を継続的に再試行します。

Lambda 出力呼び出しの頻度

Kinesis Data Analytics アプリケーションは、出力レコードをバッファして AWS Lambda 宛先関数を頻繁に呼び出します。

  • データ分析アプリケーションでタンブリングウィンドウとしてレコードがアプリケーション内ストリームに出力される場合、AWS Lambda 宛先関数はタンブリングウィンドウトリガーごとに呼び出されます。例えば、タンブリングウィンドウを 60 秒に設定してレコードを宛先のアプリケーション内ストリームに出力すると、Lambda 関数は、60 秒ごとに 1 回呼び出されます。

  • アプリケーション内で連続するクエリまたはスライディングウィンドウとしてレコードがアプリケーション内ストリームに出力される場合、Lambda 宛先関数は約 1 秒に 1 回呼び出されます。

注記

Lambda 関数あたりの呼び出しリクエストのペイロードサイズの制限が適用されます。これらの制限を超えると、出力レコードが分割され、複数の Lambda 関数呼び出しに分けて送信されます。

出力として使用するための Lambda 関数の追加

次の手順では、Kinesis Data Analytics アプリケーションの出力として Lambda 関数を追加する方法を示しています。

  1. AWS Management Console にサインインして、https://console.aws.amazon.com/kinesisanalytics にある Managed Service for Apache Flink コンソールを開きます。

  2. リストからアプリケーションを選択し、[Application details] を選択します。

  3. [宛先] セクションで、[Connect new destination] を選択します。

  4. [宛先] 項目に、[AWS Lambda 関数] を選択します。

  5. [AWS Lambda にレコードを配信] セクションで、既存の Lambda 関数とバージョンを選択するか、[新規作成] を選択します。

  6. 新しい Lambda 関数を作成する場合は、次の操作を行います。

    1. 提供されているいずれかのテンプレートのいずれかを選択します。詳細については、アプリケーションの送信先の Lambda 関数の作成

    2. [関数の作成] ページが新しいブラウザタブで開きます。[Name (名前)] ボックスで、関数にわかりやすい名前を付けます (例: myLambdaFunction)。

    3. アプリケーションの後処理機能のテンプレートを更新します。Lambda 関数作成の詳細については、AWS Lambda 開発者ガイドの入門ガイドを参照してください。

    4. Kinesis Data Analytics コンソールの [Lambda 関数] リストで、先ほど作成した Lambda 関数を選択します。Lambda 関数のバージョンは [$最新] を選択します。

  7. [In-application stream] セクションで、[Choose an existing in-application stream] を選択します。[In-application stream name] に、アプリケーションの出力ストリームを選択します。選択した出力ストリームからの結果は、Lambda 出力関数に送信されます。

  8. 残りのフォームはデフォルト値のままにして、[Save and continue] を選択します。

アプリケーションはアプリケーション内ストリームから Lambda 関数にレコードを送信するようになりました。Amazon CloudWatch コンソールのデフォルトテンプレートの結果を確認できます。AWS/KinesisAnalytics/LambdaDelivery.OkRecords メトリクスをモニタリングして、Lambda 関数に配信されるレコードの数を確認します。

出力エラーとしてよく見られる Lambda

以下は、Lambda 関数への配信が失敗する可能性のある一般的な理由です。

  • Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。

  • レスポンスにレコード ID、またはステータスフィールドのいずれかが欠落しています。

  • Lambda 関数のタイムアウトが Lambda 関数内のビジネスロジックを達成するのに十分ではありません。

  • Lambda 関数内のビジネスロジックは、すべてのエラーをキャッチしないため、処理されない例外のためにタイムアウトとバックプレッシャーが生じます。これらのメッセージは、「ポイズンピル」と呼ばれることが少なくありません。

データ配信が失敗した場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗から情報を得ることができます。

  • Kinesis Data Analytics アプリケーションの Output CloudWatch メトリクスとしてのLambda : 統計の中でも、特に成功と失敗の数を示します。詳細については、「Amazon Kinesis Analytics Metrics」を参照してください。

  • AWS Lambda 関数 CloudWatch メトリクスおよびログ