慎重に検討した結果、2 つのステップでSQLアプリケーションの Amazon Kinesis Data Analytics を中止することにしました。
1. 2025 年 10 月 15 日以降、SQLアプリケーション用の新しい Kinesis Data Analytics を作成することはできません。
2. 2026 年 1 月 27 日以降、アプリケーションは削除されます。SQL アプリケーションの Amazon Kinesis Data Analytics を起動または操作することはできません。SQL それ以降、Amazon Kinesis Data Analytics のサポートは利用できなくなります。詳細については、「Amazon Kinesis Data Analytics for SQL Applications の停止」を参照してください。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Lambda 関数を使用したデータの事前処理
注記
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「制限」を参照してください。
ストリームのデータの形式の変換、変更、強化、フィルタ処理が必要な場合、AWS Lambda 関数を使用してデータを事前処理できます。アプリケーションの SQL コードが実行される前、またはアプリケーションがデータストリームからスキーマを作成する前に、これを行うことができます。
Lambda 関数によるレコードの事前処理は、次のシナリオで役立ちます。
-
他のフォーマット (KPL や GZIP など) から Kinesis Data Analytics が分析できる形式にレコードを変換します。Kinesis Data Analytics は、現在 JSON データ形式または CSV データ形式をサポートしています。
-
集計検出や異常検出などの操作でよりアクセスしやすい形式にデータを拡張します。たとえば、複数のデータ値が文字列にまとめて格納されている場合は、データを別々の列に拡張できます。
-
外挿やエラー修正などの他の Amazon サービスによるデータの強化。
-
レコードのフィールドに複雑な文字列変換を適用します。
-
データをクリーンアップするためのデータフィルタリング。
レコードを事前処理するための Lambda 関数の使用
Kinesis Data Analytics アプリケーションを作成するときは、[ソースに接続] ページで Lambda 事前処理を有効にします。
Lambda 関数を使用して Kinesis Data Analytics アプリケーションでレコードを事前処理するには
AWS Management Console にサインインして、https://console.aws.amazon.com/kinesisanalytics
にある Managed Service for Apache Flink コンソールを開きます。 -
アプリケーションの [ソースに接続] ページの [レコード事前処理] セクションで [有効化AWS Lambda] を選択します。
-
既に作成した Lambda 関数を使用するには、[Lambda 関数] ドロップダウンリストで関数を選択します。
-
Lambda 事前処理テンプレートの 1 つから新規の Lambda 関数を作成する場合は、ドロップダウンリストからテンプレートを選択します。次に、[View <template name> in Lambda (Lambda で <テンプレート名> を表示)] を選択して関数を編集します。
-
新しい Lambda 関数を作成するには、[新規作成] を選択します。Lambda 関数の作成については、AWS Lambda 開発者ガイドの「HelloWorld Lambda 関数を作成してコンソールを探る」を参照してください。
-
使用する Lambda 関数のバージョンを選択します。最新のバージョンを使用するには、[$LATEST] を選択します。
レコードの事前処理に Lambda 関数を選択または作成すると、アプリケーションの SQL コードがレコードからスキーマを実行したり、アプリケーションがレコードからスキーマを生成したりする前に、レコードが事前処理されます。
Lambda 事前処理アクセス権限
Lambda 事前処理を使用するには、アプリケーションの IAM ロールに次のアクセス許可ポリシーが必要です。
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }
Lambda 事前処理メトリクス
Amazon CloudWatch を使用して、Lambda 呼び出しの数、処理されたバイト数、成功と失敗の数などをモニタリングすることができます。Lambda の事前処理で出力される CloudWatch メトリクスについては、「Amazon Kinesis Analytics のメトリクス」を参照してください。
Kinesis Producer Library で AWS Lambda を使用
Kinesis Producer Library (KPL) は、小さなユーザーフォーマットレコードを最大 1 MB のレコードに集約して、Amazon Kinesis Data Streams スループットを有効に利用できます。Kinesis Client Library (KCL) for Java は、これらのレコードの集約解除をサポートしています。ただし、ストリームのコンシューマーとして AWS Lambda を使用する場合は、特別なモジュールを使用してレコードを集約解除する必要があります。
必要なプロジェクトコードと手順については、GitHub で AWS Lambda 用の Kinesis プロデューサーライブラリの集約解除モジュール
データ事前処理イベント入力データモデル / レコードレスポンスモデル
レコードを事前処理するには、Lambda 関数が、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。
イベント入力データモデル
Kinesis Data Analytics は、Kinesis データストリームまたは Firehose 配信ストリームから継続的にデータを読み取ります。取得したレコードの各バッチが Lambda 関数にどのように渡されたか、サービスが管理しています。関数はレコードのリストを入力として受け取ります。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、事前処理要件 (データ形式の変換や強化など) を実行します。
事前処理関数への入力モデルは、データが Kinesis データストリームから受信されたか、Firehose 配信ストリームから受信されたかによってわずかに異なります。
ソースが Firehose 配信ストリームの場合、イベント入力データモデルは次のようになります。
Kinesis Data Firehose のリクエストデータモデル
フィールド | 説明 | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 呼び出し ID (ランダム GUID)。 | ||||||||||||
applicationArn |
Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN) | ||||||||||||
streamArn |
配信ストリーム ARN | ||||||||||||
レコード
|
次の例は、Firehose 配信ストリームからの入力を示しています。
{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }
ソースが Kinesis データストリームの場合、イベント入力データモデルは次のとおりです。
Kinesis ストリームのリクエストデータモデル
フィールド | 説明 | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 呼び出し ID (ランダム GUID)。 | ||||||||||||||||||
applicationArn |
Kinesis Data Analytics アプリケーション ARN | ||||||||||||||||||
streamArn |
配信ストリーム ARN | ||||||||||||||||||
レコード
|
次の例は、Kinesis データストリームからの入力を示しています。
{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }
レコードレスポンスモデル
Lambda 関数に送信された Lambda 事前処理関数 (レコード ID 付き) から返されたすべてのレコードは返される必要があります。レコードには次のパラメータが含まれている必要があります。含まれていない場合、Kinesis Data Analytics がレコードを拒否し、データ事前処理を失敗とみなします。レコードのデータペイロード部分は、事前処理要件を達成するために変換できます。
レスポンスデータモデル
レコード
|
次の例は、Lambda 関数からの出力を示しています。
{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }
一般的なデータ事前処理の失敗
事前処理が失敗する一般的な理由は次のとおりです。
-
Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。
-
レスポンスにレコード ID、ステータス、データペイロードフィールドのいずれかが欠落しています。データペイロードフィールドは、
Dropped
またはProcessingFailed
レコードの場合はオプションです。 -
Lambda 関数のタイムアウトが、データを事前処理するのに十分ではありません。
-
Lambda 関数のレスポンスが、AWS Lambda サービスによって定められたレスポンスの上限を超えています。
データの事前処理が失敗した場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗から洞察を得ることができます。
-
Kinesis Data Analytics アプリケーション (
MillisBehindLatest
): アプリケーションの読み取りがストリーミングソースからどれだけ離れているかを示します。 -
Kinesis Data Analytics アプリケーション (
InputPreprocessing
) の CloudWatch メトリクス: 統計の中でも、特に成功と失敗の数を示します。詳細については、「Amazon Kinesis Analytics Metrics」を参照してください。 -
AWS Lambda 関数 CloudWatch メトリクスおよびログ