在仔細考慮之後,我們決定在兩個步驟中停止 Amazon Kinesis Data Analytics for SQL 應用程式:
1. 從 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Kinesis Data Analytics。
2. 我們將自 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時SQL起,Amazon Kinesis Data Analytics 將不再提供 的支援。如需詳細資訊,請參閱Amazon Kinesis Data Analytics for SQL 應用程式終止。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Lambda 函數預處理資料
注意
2023 年 9 月 12 日之後,如果尚未使用 Kinesis Data Analytics for SQL,您將無法用 Kinesis Data Firehose 做為建立新應用程式的來源。如需詳細資訊,請參閱限制。
如果串流中的資料需要格式轉換、轉換、擴充或篩選,您可以使用函數預先處理資料。 AWS Lambda 您可以在應用程式 SQL 程式碼執行之前,或在應用程式從資料串流建立結構描述之前執行此動作。
在下列案例中,使用 Lambda 函數來預先處理記錄非常有用:
-
將記錄從其他格式 (例如 KPL 或 GZIP) 轉換為 Kinesis Data Analytics 分析可以分析的格式。Kinesis Data Analytics 目前支援 JSON 或 CSV 資料格式。
-
將資料擴展為彙總或異常偵測等作業更容易存取的格式。舉例來說,如果多個資料值一起存儲在一個字串中,則可以將資料擴展到單獨的欄中。
-
透過其他 Amazon 服務進行資料充實,例如外推法或錯誤修正。
-
將複雜的字符串轉換應用於記錄欄位。
-
用於清理資料的數據過濾。
使用 Lambda 函數預處理資料
建立 Kinesis Data Analytics 應用程式時,您可以在連接至來源頁面中啟用 Lambda 預先處理。
使用 Lambda 函數預先處理 Kinesis Data Analytics 應用程式中的記錄
登入 AWS Management Console 並開啟適用於 Apache Flink 的受管理服務主控台,網址為 https://console.aws.amazon.com/kinesisanalytics
。 -
在應用程式的連接至來源頁面上,選擇記錄預處理方式] AWS Lambda區段中的啟用。
-
若要使用已建立的 Lambda 函數,請在 Lambda 函數下拉式清單中選擇函數。
-
若要從其中一個 Lambda 預處理範本建立新的 Lambda 函數,請從下拉式清單中選擇範本。然後選擇 觀看 Lambda 中的 <template name> 來編輯函數。
-
選擇建立新的來建立新 Lambda 函數。如需建立 Lambda 函數的相關資訊,請參閱AWS Lambda 開發人員指南中的建立 HelloWorld Lambda 函數和探索主控台。
-
選擇要使用的 Lambda 函數版本。若要使用最新版本,請選擇 $LATEST。
當您選擇或建立 Lambda 函數進行記錄預先處理時,系統會在執行應用程式 SQL 程式碼,或應用程式從記錄產生結構描述之前預先處理記錄。
Lambda 預處理許可
若要使用 Lambda 預處理,應用程式的 IAM 角色需要下列許可政策:
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }
Lambda 預處理指標
您可以使 CloudWatch 用 Amazon 監控 Lambda 叫用的數量、處理的位元組數、成功與失敗等。如需 Kinesis 資料分析 Lambda 預先處理所發出 CloudWatch 指標的相關資訊,請參閱 Amazon Kin esis Analytics 指標。
搭 AWS Lambda 配 Kinesis 製作者程式庫使用
Kinesis Producer Library (KPL) 會將使用者格式化的小型記錄彙整成至多 1 MB 的較大型記錄,以便更妥善利用 Amazon Kinesis Data Streams 輸送量。適用於 Java 的 Kinesis Client Library (KCL) 支援取消彙整這類記錄。但是,當您用作流的消費者時,您必須使用 AWS Lambda 特殊模塊來分解記錄。
若要取得必要的專案程式碼和指示,請參閱〈Kinesis Productor 程式庫解彙總模組
資料預處理事件輸入資料模型/記錄響應模型
若要預處理記錄,您的 Lambda 函數必須符合所需的事件輸入資料和記錄回應模型。
事件輸入資料模型
Kinesis Data Analytics 會持續讀取 Kinesis 資料串流或 Firehose 交付串流中的資料。對於擷取的每批記錄,服務會管理每個批次傳遞至 Lambda 函數的方式。您的函數接收記錄列表作為輸入。在函數中,您可以迭代列表並應用業務邏輯以完成預處理需求(例如資料格式轉換或擴充)。
預先處理函數的輸入模型會略有不同,具體取決於資料是從 Kinesis 資料串流還是 Firehose 傳遞串流接收。
如果來源是 Firehose 傳遞串流,則事件輸入資料模型如下:
Kinesis Data Firehose 請求數據模型
欄位 | 描述 | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 調用 ID (隨機 GUID)。 | ||||||||||||
applicationArn |
Kinesis Data Analytics 應用程式的 Amazon Resource Name (ARN) | ||||||||||||
streamArn |
交付串流 ARN | ||||||||||||
紀錄
|
下列範例顯示來自 Firehose 交付串流的輸入:
{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }
如果來源是 Kinesis 資料串流,則事件輸入資料模型如下:
Kinesis 串流請求資料模型
欄位 | 描述 | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 調用 ID (隨機 GUID)。 | ||||||||||||||||||
applicationArn |
Kinesis Data Analytics 應用程式 ARN | ||||||||||||||||||
streamArn |
交付串流 ARN | ||||||||||||||||||
紀錄
|
下列範例顯示 Kinesis 資料串流的輸入:
{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }
紀錄回應模型
必須傳回送至 Lambda 函數的所有從 Lambda 預處理函數傳回的記錄 (含有記錄 ID)。它們必須包含以下參數,否則 Kinesis Data Analytics 會拒絕這類紀錄,並將其視為資料預處理失敗。資料有效承載部分可以轉換,以完成預處理要求。
回應資料模型
紀錄
|
以下是 Lambda 函數輸出的範例:
{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }
常見的資料預處理失敗
以下是預處理失敗的常見原因。
-
並非所有傳送至 Lambda 函數的批次記錄 (具有記錄 ID) 都會傳回 Kinesis Data Analytics 服務。
-
回應遺失記錄 ID、狀態或資料承載欄位。資料承載欄位對於
Dropped
或ProcessingFailed
記錄而言是選擇性的。 -
Lambda 函數逾時不足以預處理資料。
-
Lambda 函數回應超過 AWS Lambda 服務施加的回應限制。
對於資料預處理失敗,Kinesis Data Analytics 會繼續在同一組記錄上重試 Lambda 調用,直到成功為止。您可以監視下列 CloudWatch 指標,以深入瞭解失敗。
-
Kinesis Data Analytics 應用程式
MillisBehindLatest
:指出應用程式從串流來源讀取落後的程度。 -
Kinesis Data Analytics 應用程式指
InputPreprocessing
CloudWatch 標:指出成功和失敗次數以及其他統計資料。如需詳細資訊,請參閱Amazon Kinesis Analytics 指標。 -
AWS Lambda 功能 CloudWatch 指標和日誌。