使用 Lambda 函數作為輸出 - Amazon Kinesis Data Analytics for SQL Applications 開發人員指南

在仔細考慮之後,我們決定在兩個步驟中停止 Amazon Kinesis Data Analytics for SQL 應用程式:

1. 從 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Kinesis Data Analytics。

2. 我們將從 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時SQL起,Amazon Kinesis Data Analytics 將不再提供 的支援。如需詳細資訊,請參閱Amazon Kinesis Data Analytics for SQL 應用程式終止

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Lambda 函數作為輸出

使用 AWS Lambda 作為目的地可讓您在將SQL結果傳送至最終目的地之前,更輕鬆地執行結果的後置處理。常見後續處理任務包括下列:

  • 將多個列彙總到單個記錄中

  • 結合目前結果與過去的結果,以處理延遲到達的資料

  • 根據資訊類型交付到不同目標

  • 記錄格式轉換 (如翻譯成 Protobuf)

  • 字串操作或轉換

  • 分析處理後的資料擴充

  • 地理空間使用案例的自訂處理

  • 資料加密

Lambda 函數可以將分析資訊傳遞至各種 AWS 服務和其他目的地,包括下列項目:

如需有關建立 Lambda 應用程式的詳細資訊,請參閱入門 AWS Lambda

Lambda 作為輸出許可

若要使用 Lambda 作為輸出,應用程式的 Lambda 輸出IAM角色需要下列許可政策:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "FunctionARN" }

Lambda 作為輸出指標

您可以使用 Amazon CloudWatch 來監控傳送的位元組數、成功和失敗等。如需使用 Lambda 作為輸出的 Kinesis Data Analytics 發出的 CloudWatch 指標資訊,請參閱 Amazon Kinesis Analytics 指標

Lambda 作為輸出事件輸入資料模型和記錄回應模型

若要傳送 Kinesis Data Analytics 輸出記錄,您的 Lambda 函數必須符合所需的事件輸入資料和記錄回應模型。

事件輸入資料模型

Kinesis Data Analytics 會持續將輸出記錄從應用程式傳送至 Lambda,這些輸出紀錄用作輸出函數,並具有下列要求模型。在函數中,迭代列表並應用業務邏輯來完成輸出需求(例如,在發送到最終目的地之前的資料轉換)。

欄位 描述
invocationId Lambda 調用 ID (隨機 GUID)。
applicationArn Kinesis Data Analytics 應用程式 Amazon Resource Name (ARN)。
紀錄
欄位 描述
recordId 記錄 ID (隨機 GUID)
lambdaDeliveryRecordMetadata
欄位 描述
retryHint 交付重試次數
資料 Base64 編碼輸出紀錄承載
注意

retryHint 值在每次交付失敗都會增加。此值不會長期存在,如果應用程式中斷,則會重設。

紀錄回應模型

作為輸出函數傳送至 Lambda 的每個記錄 (記錄為 IDs) 都必須以 Ok或 確認DeliveryFailed,且必須包含下列參數。否則,Kinesis Data Analytics 會將它們視為交付失敗。

紀錄
欄位 描述
recordId 在調用期間,記錄 ID 會從 Kinesis Data Analytics 傳遞至 Lambda。原始記錄與經確認記錄的 ID 若有任何不符,就會視為幾交付失敗。
result 記錄交付的狀態。以下是可能的值:
  • Ok:記錄已成功轉換並傳送至最終目的地。Kinesis Data Analytics 會擷取要SQL處理的記錄。

  • DeliveryFailed:Lambda 作為輸出函數,未成功將記錄交付至最終目的地。Kinesis Data Analytics 會持續重試,將交付失敗記錄傳送至 Lambda 作為輸出函數。

Lambda 輸出調用頻率

Kinesis Data Analytics 應用程式會緩衝輸出記錄,並經常調用 AWS Lambda 目標函數。

  • 如果記錄以蹬轉時段的形式傳送到資料分析應用程式中的目的地應用程式內串流,則每個蹬轉時段觸發程序都會叫用 AWS Lambda 目的地函數。例如,如果使用 60 秒的輪轉窗口將記錄發送到目的地應用程式內串流,則每 60 秒會調用 Lambda 函數一次。

  • 如果記錄以連續查詢或滑動窗口的形式,發送到應用程式的目的地應用程式內串流,則每秒大約會調用一次 Lambda 目的地函數。

注意

適用每個 Lambda 函數調用要求承載大小限制。超過這些限制會導致輸出記錄分割,並跨越多個 Lambda 函數呼叫傳送。

新增用作輸出的 Lambda 函數

以下程序說明如何將 Lambda 函數新增為 Kinesis Data Analytics 應用程式的輸出。

  1. 登入 AWS Management Console ,並在 https://console.aws.amazon.com/kinesisanalytics 開啟 Managed Service for Apache Flink 主控台。

  2. 選擇清單中的應用程式,然後選擇應用程式詳細資訊

  3. 目的地區段中,選擇連接新目的地

  4. 針對目的地項目,選擇 AWS Lambda 函數

  5. 交付記錄至 AWS Lambda 區段中,選擇現有的 Lambda 函數和版本,或選擇建立新的

  6. 如果您要建立新 Lambda 函數,請執行下列動作:

    1. 選擇其中一個範本。如需詳細資訊,為應用程式目的地建立 Lambda 函數

    2. 建立函數 頁面在 Web 瀏覽器的新瀏覽器標籤中開啟。在名稱方塊中,為函數指定一個有意義的名稱 (例如 myLambdaFunction)。

    3. 用後續處理功能為應用程式更新範本。如需建立 Lambda 函數的詳細資訊,請參閱《AWS Lambda 開發人員指南》中的 入門

    4. 在 Kinesis Data Analytics 主控台的 Lambda 函數清單中,選擇您剛建立的 Lambda 函數。針對 Lambda 函數版本選擇 $LATEST

  7. 應用程式內串流區段,選擇選擇現有的應用程式內串流。針對應用程式內串流名稱,選擇應用程式的輸出串流。所選輸出串流的結果會傳送至 Lambda 輸出函數。

  8. 將表格的其餘部分保留為預設值,然後選擇儲存並繼續

您的應用程式現在會將記錄從應用程式內串流傳送到 Lambda 函數。您可以在 Amazon CloudWatch 主控台中查看預設範本的結果。監控 AWS/KinesisAnalytics/LambdaDelivery.OkRecords 指標,以查看交付至 Lambda 函數的記錄數。

Lambda 作為輸出之常見故障

以下是交付至 Lambda 函數可能會失敗的常見原因。

  • 並非所有傳送至 Lambda 函數的批次記錄 (具有記錄 IDs) 都會傳回至 Kinesis Data Analytics 服務。

  • 回應遺失記錄 ID 或狀態欄位。

  • Lambda 函數逾時不足以完成 Lambda 函數中的業務邏輯。

  • Lambda 函數中的業務邏輯不會擷取所有錯誤,導致未處理的例外狀況造成逾時和背壓。這些通常被稱為「毒丸」訊息。

對於資料傳遞失敗,Kinesis Data Analytics 會繼續在同一組記錄上重試 Lambda 調用,直到成功為止。若要深入了解故障,您可以監控下列 CloudWatch 指標:

  • Kinesis Data Analytics 應用程式 Lambda 作為輸出 CloudWatch 指標:指出成功和失敗的數量,以及其他統計資料。如需詳細資訊,請參閱Amazon Kinesis Analytics 指標

  • AWS Lambda 函數 CloudWatch 指標和日誌。