使用 Lambda 函數預處理資料 - 亞馬遜 Kinesis SQL 應用程式資料分析開發人員指南

對於新專案,我們建議您使用適用於 Apache Flink Studio 的全新受管理服務,取代適用於應用程式的 Kinesis Data Analytics。SQLManaged Service for Apache Flink Studio 易於使用且具備進階分析功能,讓您在幾分鐘內建置複雜的串流處理應用程式。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Lambda 函數預處理資料

注意

2023 年 9 月 12 日之後,如果尚未使用 Kinesis Data Analytics for SQL,您將無法用 Kinesis Data Firehose 做為建立新應用程式的來源。如需詳細資訊,請參閱限制

如果串流中的資料需要格式轉換、轉換、擴充或篩選,您可以使用函數預先處理資料。 AWS Lambda 您可以在應用程式 SQL 程式碼執行之前,或在應用程式從資料串流建立結構描述之前執行此動作。

在下列案例中,使用 Lambda 函數來預先處理記錄非常有用:

  • 將記錄從其他格式 (例如 KPL 或 GZIP) 轉換為 Kinesis Data Analytics 分析可以分析的格式。Kinesis Data Analytics 目前支援 JSON 或 CSV 資料格式。

  • 將資料擴展為彙總或異常偵測等作業更容易存取的格式。舉例來說,如果多個資料值一起存儲在一個字串中,則可以將資料擴展到單獨的欄中。

  • 透過其他 Amazon 服務進行資料充實,例如外推法或錯誤修正。

  • 將複雜的字符串轉換應用於記錄欄位。

  • 用於清理資料的數據過濾。

使用 Lambda 函數預處理資料

建立 Kinesis Data Analytics 應用程式時,您可以在連接至來源頁面中啟用 Lambda 預先處理。

使用 Lambda 函數預先處理 Kinesis Data Analytics 應用程式中的記錄
  1. 登入 AWS Management Console 並開啟適用於 Apache Flink 的受管理服務主控台,網址為 https://console.aws.amazon.com/kinesisanalytics

  2. 在應用程式的連接至來源頁面上,選擇記錄預處理方式] AWS Lambda區段中的啟用

  3. 若要使用已建立的 Lambda 函數,請在 Lambda 函數下拉式清單中選擇函數。

  4. 若要從其中一個 Lambda 預處理範本建立新的 Lambda 函數,請從下拉式清單中選擇範本。然後選擇 觀看 Lambda 中的 <template name> 來編輯函數。

  5. 選擇建立新的來建立新 Lambda 函數。如需建立 Lambda 函數的相關資訊,請參閱AWS Lambda 開發人員指南中的建立 HelloWorld Lambda 函數和探索主控台

  6. 選擇要使用的 Lambda 函數版本。若要使用最新版本,請選擇 $LATEST

當您選擇或建立 Lambda 函數進行記錄預先處理時,系統會在執行應用程式 SQL 程式碼,或應用程式從記錄產生結構描述之前預先處理記錄。

Lambda 預處理許可

若要使用 Lambda 預處理,應用程式的 IAM 角色需要下列許可政策:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Lambda 預處理指標

您可以使 CloudWatch 用 Amazon 監控 Lambda 叫用的數量、處理的位元組數、成功與失敗等。如需 Kinesis 資料分析 Lambda 預先處理所發出 CloudWatch 指標的相關資訊,請參閱 Amazon Kin esis Analytics 指標。

搭 AWS Lambda 配 Kinesis 製作者程式庫使用

Kinesis Producer Library (KPL) 會將使用者格式化的小型記錄彙整成至多 1 MB 的較大型記錄,以便更妥善利用 Amazon Kinesis Data Streams 輸送量。適用於 Java 的 Kinesis Client Library (KCL) 支援取消彙整這類記錄。但是,當您用作流的消費者時,您必須使用 AWS Lambda 特殊模塊來分解記錄。

若要取得必要的專案程式碼和指示,請參閱〈Kinesis Productor 程式庫解彙總模組〉。 AWS LambdaGitHub您可以使用這個項目中的組件來處理 Java,Node.js 和 Python AWS Lambda 中的 KPL 序列化數據。上述元件也可用於建構多語言 KCL 應用程式

資料預處理事件輸入資料模型/記錄響應模型

若要預處理記錄,您的 Lambda 函數必須符合所需的事件輸入資料和記錄回應模型。

事件輸入資料模型

Kinesis Data Analytics 會持續讀取 Kinesis 資料串流或 Firehose 交付串流中的資料。對於擷取的每批記錄,服務會管理每個批次傳遞至 Lambda 函數的方式。您的函數接收記錄列表作為輸入。在函數中,您可以迭代列表並應用業務邏輯以完成預處理需求(例如資料格式轉換或擴充)。

預先處理函數的輸入模型會略有不同,具體取決於資料是從 Kinesis 資料串流還是 Firehose 傳遞串流接收。

如果來源是 Firehose 傳遞串流,則事件輸入資料模型如下:

Kinesis Data Firehose 請求數據模型

欄位 描述
invocationId Lambda 調用 ID (隨機 GUID)。
applicationArn Kinesis Data Analytics 應用程式的 Amazon Resource Name (ARN)
streamArn 交付串流 ARN
紀錄
欄位 描述
recordId 記錄 ID (隨機 GUID)
kinesisFirehoseRecordMetadata
欄位 描述
approximateArrivalTimestamp 交付串流記錄大約到達時間
data Base64 編碼來源記錄承載

下列範例顯示來自 Firehose 交付串流的輸入:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

如果來源是 Kinesis 資料串流,則事件輸入資料模型如下:

Kinesis 串流請求資料模型

欄位 描述
invocationId Lambda 調用 ID (隨機 GUID)。
applicationArn Kinesis Data Analytics 應用程式 ARN
streamArn 交付串流 ARN
紀錄
欄位 描述
recordId 以 Kinesis 記錄序號為基礎的記錄 ID
kinesisStreamRecordMetadata
欄位 描述
sequenceNumber 來自 Kinesis 串流記錄的序號
partitionKey Kinesis 串流記錄中的分割區索引鍵
shardId Kinesis 串流記錄的 ShardId
approximateArrivalTimestamp 交付串流記錄大約到達時間
資料 Base64 編碼來源記錄承載

下列範例顯示 Kinesis 資料串流的輸入:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

紀錄回應模型

必須傳回送至 Lambda 函數的所有從 Lambda 預處理函數傳回的記錄 (含有記錄 ID)。它們必須包含以下參數,否則 Kinesis Data Analytics 會拒絕這類紀錄,並將其視為資料預處理失敗。資料有效承載部分可以轉換,以完成預處理要求。

回應資料模型

紀錄
欄位 描述
recordId 在調用期間,記錄 ID 會從 Kinesis Data Analytics 傳遞至 Lambda。轉換記錄必須包含相同的記錄 ID。原始記錄的 ID 與轉換記錄的 ID 若有任何不符,就會視為資料轉換失敗。
result 記錄的資料轉換狀態。可能值如下:
  • Ok:記錄已成功轉換。Kinesis Data Analytics 會擷取記錄讓 SQL 處理。

  • Dropped:您的處理邏輯故意丟棄了記錄。Kinesis Data Analytics 捨棄經 SQL 處理的紀錄。資料承載欄位對於 Dropped 記錄而言是選擇性的。

  • ProcessingFailed:無法轉換記錄。Kinesis Data Analytics 認為 Lambda 函數未成功處理它,並將錯誤寫入錯誤串流。關於錯誤串流的詳細資訊,請查看 錯誤處理。資料承載欄位對於 ProcessingFailed 記錄而言是選擇性的。

data 已轉換資料承載 (base64 編碼後)。如果應用程式擷取資料格式為 JSON,則每個資料承載都可以包含多個 JSON 文件。或者,如果應用程式擷取資料格式為 CSV,則每個列都可以包含多個 CSV 列 (在每一列中指定資料列分隔符號)。Kinesis Data Analytics 服務能夠在相同資料承載中成功剖析和處理包含多個 JSON 文件或 CSV 列的資料。

以下是 Lambda 函數輸出的範例:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

常見的資料預處理失敗

以下是預處理失敗的常見原因。

  • 並非所有傳送至 Lambda 函數的批次記錄 (具有記錄 ID) 都會傳回 Kinesis Data Analytics 服務。

  • 回應遺失記錄 ID、狀態或資料承載欄位。資料承載欄位對於 DroppedProcessingFailed 記錄而言是選擇性的。

  • Lambda 函數逾時不足以預處理資料。

  • Lambda 函數回應超過 AWS Lambda 服務施加的回應限制。

對於資料預處理失敗,Kinesis Data Analytics 會繼續在同一組記錄上重試 Lambda 調用,直到成功為止。您可以監視下列 CloudWatch 指標,以深入瞭解失敗。

  • Kinesis Data Analytics 應用程式 MillisBehindLatest:指出應用程式從串流來源讀取落後的程度。

  • Kinesis Data Analytics 應用程式指InputPreprocessing CloudWatch 標:指出成功和失敗次數以及其他統計資料。如需詳細資訊,請參閱Amazon Kinesis Analytics 指標

  • AWS Lambda 功能 CloudWatch 指標和日誌。