

在仔細考慮之後，我們決定停止 Amazon Kinesis Data Analytics for SQL 應用程式：

1. 從 **2025 年 9 月 1 日起，**我們不會為 Amazon Kinesis Data Analytics for SQL 應用程式提供任何錯誤修正，因為考慮到即將終止，我們將對其提供有限的支援。

2. 從 **2025 年 10 月 15 日起，**您將無法建立新的 Kinesis Data Analytics for SQL 應用程式。

3. 我們將自 **2026 年 1 月 27** 日起刪除您的應用程式。您將無法啟動或操作 Amazon Kinesis Data Analytics for SQL 應用程式。從那時起，Amazon Kinesis Data Analytics for SQL 將不再提供支援。如需詳細資訊，請參閱[Amazon Kinesis Data Analytics for SQL 應用程式終止](discontinuation.md)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Lambda 函數預處理資料
<a name="lambda-preprocessing"></a>

**注意**  
2023 年 9 月 12 日之後，如果尚未使用 Kinesis Data Analytics for SQL，您將無法用 Kinesis Data Firehose 做為建立新應用程式的來源。如需詳細資訊，請參閱[限制](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)。

如果串流中的資料需要格式轉換、轉換、擴充或篩選，您可以使用 AWS Lambda 函數預先處理資料。您可以在應用程式 SQL 程式碼執行之前，或在應用程式從資料串流建立結構描述之前執行此動作。

在下列案例中，使用 Lambda 函數來預先處理記錄非常有用：
+ 將記錄從其他格式 (例如 KPL 或 GZIP) 轉換為 Kinesis Data Analytics 分析可以分析的格式。Kinesis Data Analytics 目前支援 JSON 或 CSV 資料格式。
+ 將資料擴展為彙總或異常偵測等作業更容易存取的格式。舉例來說，如果多個資料值一起存儲在一個字串中，則可以將資料擴展到單獨的資料欄中。
+ 透過其他 Amazon 服務進行資料充實，例如外推法或錯誤修正。
+ 將複雜的字符串轉換應用於記錄欄位。
+ 用於清理資料的數據過濾。

## 使用 Lambda 函數預處理資料
<a name="lambda-preprocessing-use"></a>

建立 Kinesis Data Analytics 應用程式時，您可以在**連接至來源**頁面中啟用 Lambda 預先處理。

**使用 Lambda 函數預先處理 Kinesis Data Analytics 應用程式中的記錄**

1. 登入 AWS 管理主控台 並開啟 Managed Service for Apache Flink 主控台，網址為 [ https：//https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)。

1. 在應用程式的**連接至來源**頁面上，選擇**記錄預處理方式**] AWS Lambda區段中的**啟用**。

1. 若要使用已建立的 Lambda 函數，請在 **Lambda 函數**下拉式清單中選擇函數。

1. 若要從其中一個 Lambda 預處理範本建立新的 Lambda 函數，請從下拉式清單中選擇範本。然後選擇 **觀看 Lambda 中的 <template name>** 來編輯函數。

1. 選擇**建立新的**來建立新 Lambda 函數。如需有關使用 Lambda 函數的詳細資訊，請參閱《AWS Lambda 開發人員指南》**中的[建立 HelloWorld Lambda 函數並探索主控台](https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html)。

1. 選擇要使用的 Lambda 函數版本。若要使用最新版本，請選擇 **\$1LATEST**。

當您選擇或建立 Lambda 函數進行記錄預先處理時，系統會在執行應用程式 SQL 程式碼，或應用程式從記錄產生結構描述之前預先處理記錄。

## Lambda 預處理許可
<a name="lambda-preprocessing-policy"></a>

若要使用 Lambda 預處理，應用程式的 IAM 角色需要下列許可政策：

```
     {
       "Sid": "UseLambdaFunction",
       "Effect": "Allow",
       "Action": [
           "lambda:InvokeFunction",
           "lambda:GetFunctionConfiguration"
       ],
       "Resource": "<FunctionARN>"
   }
```

## Lambda 預處理指標
<a name="lambda-preprocessing-metrics"></a>

您可以使用 Amazon CloudWatch 來監控 Lambda 調用的位元數、成功與失敗情況等。如需 Kinesis Data Analytics 使用 Lambda 預處理發出的 CloudWatch 指標相關資訊，請參閱 [Amazon Kinesis Analytics 指標](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)。

## AWS Lambda 搭配 Kinesis Producer Library 使用
<a name="lambda-preprocessing-deaggregation"></a>

[Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) (KPL) 會將使用者格式化的小型記錄彙整成至多 1 MB 的較大型記錄，以便更妥善利用 Amazon Kinesis Data Streams 輸送量。適用於 Java 的 Kinesis Client Library (KCL) 支援取消彙整這類記錄。不過，當您使用 AWS Lambda 做為串流的取用者時，您必須使用特殊模組來取消彙總記錄。

您可以從 GitHub 取得必要的專案程式碼及相關指示，詳情請參閱[Kinesis Producer Library Deaggregation Modules for AWS Lambda](https://github.com/awslabs/kinesis-deaggregation)。您可以使用此專案中的元件，在 Java、Node.js 和 Python AWS Lambda 中的 中處理 KPL 序列化資料。上述元件也可用於建構[多語言 KCL 應用程式](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java)。

## 資料預處理事件輸入資料模型/記錄響應模型
<a name="lambda-preprocessing-data-model"></a>

若要預處理記錄，您的 Lambda 函數必須符合所需的事件輸入資料和記錄回應模型。

### 事件輸入資料模型
<a name="lambda-preprocessing-request-model"></a>

Kinesis Data Analytics 會持續從您的 Kinesis 資料串流或 Firehose 交付串流讀取資料。對於擷取的每批記錄，服務會管理每個批次傳遞至 Lambda 函數的方式。您的函數接收記錄列表作為輸入。在函數中，您可以迭代列表並應用業務邏輯以完成預處理需求（例如資料格式轉換或擴充）。

預先處理函數的輸入模型會略有不同，取決於資料是從 Kinesis 資料串流還是 Firehose 交付串流接收。

如果來源是 Firehose 交付串流，則事件輸入資料模型如下所示：

**Kinesis Data Firehose 請求數據模型**


| 欄位 | Description | 
| --- | --- | 
| 欄位 | Description | 
| --- | --- | 
| 欄位 | Description | 
| --- | --- | 
| invocationId | Lambda 調用 ID (隨機 GUID)。 | 
| applicationArn | Kinesis Data Analytics 應用程式的 Amazon Resource Name (ARN) | 
| streamArn | 交付串流 ARN | 
| 紀錄 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | 記錄 ID (隨機 GUID) | 
| kinesisFirehoseRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Base64 編碼來源記錄承載 | 
| approximateArrivalTimestamp | 交付串流記錄大約到達時間 | 

下列範例顯示來自 Firehose 交付串流的輸入：

```
{
   "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",
   "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
   "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test",
   "records":[
      {
         "recordId":"49572672223665514422805246926656954630972486059535892482",
         "data":"aGVsbG8gd29ybGQ=",
         "kinesisFirehoseRecordMetadata":{
            "approximateArrivalTimestamp":1520280173
         }
      }
   ]
}
```

如果來源是 Kinesis 資料串流，則事件輸入資料模型如下：

**Kinesis 串流請求資料模型**


| 欄位 | Description | 
| --- | --- | 
| 欄位 | Description | 
| --- | --- | 
| 欄位 | Description | 
| --- | --- | 
| invocationId | Lambda 調用 ID (隨機 GUID)。 | 
| applicationArn | Kinesis Data Analytics 應用程式 ARN | 
| streamArn | 交付串流 ARN | 
| 紀錄 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | 以 Kinesis 記錄序號為基礎的記錄 ID | 
| kinesisStreamRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| 資料 | Base64 編碼來源記錄承載 | 
| sequenceNumber | 來自 Kinesis 串流記錄的序號 | 
| partitionKey | Kinesis 串流記錄中的分割區索引鍵 | 
| shardId | Kinesis 串流記錄的 ShardId | 
| approximateArrivalTimestamp | 交付串流記錄大約到達時間 | 

下列範例顯示 Kinesis 資料串流的輸入：

```
{
  "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",
  "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
  "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test",
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "data": "aGVsbG8gd29ybGQ=",
      "kinesisStreamRecordMetadata":{
            "shardId" :"shardId-000000000003",
            "partitionKey":"7400791606",
            "sequenceNumber":"49572672223665514422805246926656954630972486059535892482",
            "approximateArrivalTimestamp":1520280173
         }
    }
  ]
}
```

### 紀錄回應模型
<a name="lambda-preprocessing-response-model"></a>

必須傳回送至 Lambda 函數的所有從 Lambda 預處理函數傳回的記錄 (含有記錄 ID)。它們必須包含以下參數，否則 Kinesis Data Analytics 會拒絕這類紀錄，並將其視為資料預處理失敗。資料有效承載部分可以轉換，以完成預處理要求。

**回應資料模型**


| 欄位 | Description | 
| --- | --- | 
| 紀錄 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | 在調用期間，記錄 ID 會從 Kinesis Data Analytics 傳遞至 Lambda。轉換記錄必須包含相同的記錄 ID。原始記錄的 ID 與轉換記錄的 ID 若有任何不符，就會視為資料轉換失敗。 | 
| result | 記錄的資料轉換狀態。可能值如下：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | 已轉換資料承載 (base64 編碼後)。如果應用程式擷取資料格式為 JSON，則每個資料承載都可以包含多個 JSON 文件。或者，如果應用程式擷取資料格式為 CSV，則每個列都可以包含多個 CSV 列 (在每一列中指定資料列分隔符號)。Kinesis Data Analytics 服務能夠在相同資料承載中成功剖析和處理包含多個 JSON 文件或 CSV 列的資料。 | 

以下是 Lambda 函數輸出的範例：

```
{
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "result": "Ok",
      "data": "SEVMTE8gV09STEQ="
    }
  ]
}
```

## 常見的資料預處理失敗
<a name="lambda-preprocessing-failures"></a>

以下是預處理失敗的常見原因。
+ 並非所有傳送至 Lambda 函數的批次記錄 (具有記錄 ID) 都會傳回 Kinesis Data Analytics 服務。
+ 回應遺失記錄 ID、狀態或資料承載欄位。資料承載欄位對於 `Dropped` 或 `ProcessingFailed` 記錄而言是選擇性的。
+ Lambda 函數逾時不足以預處理資料。
+ Lambda 函數回應超過 AWS Lambda 服務施加的回應限制。

對於資料預處理失敗，Kinesis Data Analytics 會繼續在同一組記錄上重試 Lambda 調用，直到成功為止。您可以監控下列 CloudWatch 指標來深入暸解故障情況。
+ Kinesis Data Analytics 應用程式 `MillisBehindLatest`：指出應用程式從串流來源讀取落後的程度。
+ Kinesis Data Analytics 應用程式 `InputPreprocessing` CloudWatch 指標：指出成功和失敗的次數，以及其他統計資料。如需詳細資訊，請參閱[Amazon Kinesis Analytics 指標](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)。
+ AWS Lambda 函數 CloudWatch 指標和日誌。