

慎重に検討した結果、Amazon Kinesis Data Analytics for SQL アプリケーションを中止することにしました。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

2. **2025 年 10 月 15** 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできません。

3. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Lambda 関数を使用したデータの事前処理
<a name="lambda-preprocessing"></a>

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「[制限](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)」を参照してください。

ストリーム内のデータに形式変換、変換、エンリッチメント、フィルタリングが必要な場合は、 AWS Lambda 関数を使用してデータを前処理できます。アプリケーションの SQL コードが実行される前、またはアプリケーションがデータストリームからスキーマを作成する前に、これを行うことができます。

Lambda 関数によるレコードの事前処理は、次のシナリオで役立ちます。
+ 他のフォーマット (KPL や GZIP など) から Kinesis Data Analytics が分析できる形式にレコードを変換します。Kinesis Data Analytics は、現在 JSON データ形式または CSV データ形式をサポートしています。
+ 集計検出や異常検出などの操作でよりアクセスしやすい形式にデータを拡張します。たとえば、複数のデータ値が文字列にまとめて格納されている場合は、データを別々の列に拡張できます。
+ 外挿やエラー修正などの他の Amazon サービスによるデータの強化。
+ レコードのフィールドに複雑な文字列変換を適用します。
+ データをクリーンアップするためのデータフィルタリング。

## レコードを事前処理するための Lambda 関数の使用
<a name="lambda-preprocessing-use"></a>

Kinesis Data Analytics アプリケーションを作成するときは、[**ソースに接続**] ページで Lambda 事前処理を有効にします。

**Lambda 関数を使用して Kinesis Data Analytics アプリケーションでレコードを事前処理するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) で Managed Service for Apache Flink コンソールを開きます。

1. アプリケーションの [**ソースに接続**] ページの [**レコード事前処理**] セクションで [**有効化 AWS Lambda**] を選択します。

1. 既に作成した Lambda 関数を使用するには、[**Lambda 関数**] ドロップダウンリストで関数を選択します。

1. Lambda 事前処理テンプレートの 1 つから新規の Lambda 関数を作成する場合は、ドロップダウンリストからテンプレートを選択します。次に、[**View <template name> in Lambda (Lambda で <テンプレート名> を表示)**] を選択して関数を編集します。

1. 新しい Lambda 関数を作成するには、[**新規作成**] を選択します。Lambda 関数の作成については、AWS Lambda 開発者ガイドの「[HelloWorld Lambda 関数を作成してコンソールを探る](https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html)」を参照してください。

1. 使用する Lambda 関数のバージョンを選択します。最新のバージョンを使用するには、[**\$1LATEST**] を選択します。

レコードの事前処理に Lambda 関数を選択または作成すると、アプリケーションの SQL コードがレコードからスキーマを実行したり、アプリケーションがレコードからスキーマを生成したりする前に、レコードが事前処理されます。

## Lambda 事前処理アクセス権限
<a name="lambda-preprocessing-policy"></a>

Lambda 事前処理を使用するには、アプリケーションの IAM ロールに次のアクセス許可ポリシーが必要です。

```
     {
       "Sid": "UseLambdaFunction",
       "Effect": "Allow",
       "Action": [
           "lambda:InvokeFunction",
           "lambda:GetFunctionConfiguration"
       ],
       "Resource": "<FunctionARN>"
   }
```

## Lambda 事前処理メトリクス
<a name="lambda-preprocessing-metrics"></a>

Amazon CloudWatch を使用して、Lambda 呼び出しの数、処理されたバイト数、成功と失敗の数などをモニタリングすることができます。Lambda の事前処理で出力される CloudWatch メトリクスについては、「[Amazon Kinesis Analytics のメトリクス](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)」を参照してください。

## Kinesis プロデューサーライブラリ AWS Lambda での の使用
<a name="lambda-preprocessing-deaggregation"></a>

[Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) (KPL) は、小さなユーザーフォーマットレコードを最大 1 MB のレコードに集約して、Amazon Kinesis Data Streams スループットを有効に利用できます。Kinesis Client Library (KCL) for Java は、これらのレコードの集約解除をサポートしています。ただし、ストリームのコンシューマー AWS Lambda として を使用する場合は、特別なモジュールを使用してレコードを集約解除する必要があります。

必要なプロジェクトコードと手順については、GitHub で [AWS Lambda用の Kinesis プロデューサーライブラリの集約解除モジュール](https://github.com/awslabs/kinesis-deaggregation)について参照してください。このプロジェクトのコンポーネントを使用して、Java、Node.js、Python AWS Lambda で 内の KPL シリアル化されたデータを処理できます。これらのコンポーネントは、[複数言語 KCL アプリケーション](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java)の一部として使用することもできます。

## データ事前処理イベント入力データモデル / レコードレスポンスモデル
<a name="lambda-preprocessing-data-model"></a>

レコードを事前処理するには、Lambda 関数が、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。

### イベント入力データモデル
<a name="lambda-preprocessing-request-model"></a>

Kinesis Data Analytics は、Kinesis データストリームまたは Firehose 配信ストリームから継続的にデータを読み取ります。取得したレコードの各バッチが Lambda 関数にどのように渡されたか、サービスが管理しています。関数はレコードのリストを入力として受け取ります。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、事前処理要件 (データ形式の変換や強化など) を実行します。

事前処理関数への入力モデルは、データが Kinesis データストリームから受信されたか、Firehose 配信ストリームから受信されたかによってわずかに異なります。

ソースが Firehose 配信ストリームの場合、イベント入力データモデルは次のようになります。

**Kinesis Data Firehose のリクエストデータモデル**


| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| invocationId | Lambda 呼び出し ID (ランダム GUID)。 | 
| applicationArn | Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN) | 
| streamArn | 配信ストリーム ARN | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | レコード ID (ランダム GUID) | 
| kinesisFirehoseRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Base64 でエンコードされたソースレコードのペイロード | 
| approximateArrivalTimestamp | 配信ストリームレコードの概算到着時間 | 

次の例は、Firehose 配信ストリームからの入力を示しています。

```
{
   "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",
   "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
   "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test",
   "records":[
      {
         "recordId":"49572672223665514422805246926656954630972486059535892482",
         "data":"aGVsbG8gd29ybGQ=",
         "kinesisFirehoseRecordMetadata":{
            "approximateArrivalTimestamp":1520280173
         }
      }
   ]
}
```

ソースが Kinesis データストリームの場合、イベント入力データモデルは次のとおりです。

**Kinesis ストリームのリクエストデータモデル**


| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| invocationId | Lambda 呼び出し ID (ランダム GUID)。 | 
| applicationArn | Kinesis Data Analytics アプリケーション ARN | 
| streamArn | 配信ストリーム ARN | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | Kinesis レコードのシーケンス番号に基づいたレコード ID | 
| kinesisStreamRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Base64 でエンコードされたソースレコードのペイロード | 
| sequenceNumber | Kinesis ストリームレコードからのシーケンス番号 | 
| partitionKey | Kinesis ストリームレコードからのパーティションキー | 
| shardId | Kinesis ストリームレコードからの ShardId | 
| approximateArrivalTimestamp | 配信ストリームレコードの概算到着時間 | 

次の例は、Kinesis データストリームからの入力を示しています。

```
{
  "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",
  "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
  "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test",
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "data": "aGVsbG8gd29ybGQ=",
      "kinesisStreamRecordMetadata":{
            "shardId" :"shardId-000000000003",
            "partitionKey":"7400791606",
            "sequenceNumber":"49572672223665514422805246926656954630972486059535892482",
            "approximateArrivalTimestamp":1520280173
         }
    }
  ]
}
```

### レコードレスポンスモデル
<a name="lambda-preprocessing-response-model"></a>

Lambda 関数に送信された Lambda 事前処理関数 (レコード ID 付き) から返されたすべてのレコードは返される必要があります。レコードには次のパラメータが含まれている必要があります。含まれていない場合、Kinesis Data Analytics がレコードを拒否し、データ事前処理を失敗とみなします。レコードのデータペイロード部分は、事前処理要件を達成するために変換できます。

**レスポンスデータモデル**


| フィールド | 説明 | 
| --- | --- | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | レコード ID は呼び出し時に Kinesis Data Analytics から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ事前処理の失敗として扱われます。 | 
| result | レコードのデータ変換のステータス。指定できる値は以下のとおりです。[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | base64 エンコード後の変換されたデータペイロード。アプリケーションの取り込みデータ形式が JSON である場合、各データペイロードには複数の JSON ドキュメントを含めることができます。または、アプリケーションの取り込みデータ形式が CSV である場合、それぞれに複数の CSV 行を含めることができます (各行には行の区切り文字が入ります)。Kinesis Data Analytics サービスは、同じデータペイロード内の複数の JSON ドキュメントまたは CSV 行のいずれかを使用して、データを正常に解析して処理します。 | 

次の例は、Lambda 関数からの出力を示しています。

```
{
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "result": "Ok",
      "data": "SEVMTE8gV09STEQ="
    }
  ]
}
```

## 一般的なデータ事前処理の失敗
<a name="lambda-preprocessing-failures"></a>

事前処理が失敗する一般的な理由は次のとおりです。
+ Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。
+ レスポンスにレコード ID、ステータス、データペイロードフィールドのいずれかが欠落しています。データペイロードフィールドは、`Dropped` または `ProcessingFailed` レコードの場合はオプションです。
+ Lambda 関数のタイムアウトが、データを事前処理するのに十分ではありません。
+ Lambda 関数のレスポンスが、 AWS Lambda サービスによって定められたレスポンスの上限を超えています。

データの事前処理が失敗した場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗から洞察を得ることができます。
+ Kinesis Data Analytics アプリケーション (`MillisBehindLatest`): アプリケーションの読み取りがストリーミングソースからどれだけ離れているかを示します。
+ Kinesis Data Analytics アプリケーション (`InputPreprocessing`) の CloudWatch メトリクス: 統計の中でも、特に成功と失敗の数を示します。詳細については、「[Amazon Kinesis Analytics Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)」を参照してください。
+ AWS Lambda 関数 CloudWatch メトリクスとログ。