使用 Lambda 處理 Amazon Kinesis Data Streams 記錄 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Lambda 處理 Amazon Kinesis Data Streams 記錄

若要使用 Lambda 處理 Amazon Kinesis Data Streams 記錄,請為您的串流建立一個取用者,然後建立 Lambda 事件來源對應。

設定您的資料串流及函數

您的 Lambda 函數是適用於資料串流的取用者應用程式。其會從每個碎片中一次處理一個批次的記錄。您可以將 Lambda 函數對應至共用輸送量取用程式 (標準迭代程式),或對應至具有增強散發功能的專用輸送量取用程式。

  • 標準迭代器:Lambda 會以每秒一次的基本速率輪詢 Kinesis 串流中的每個碎片以取得記錄。有更多記錄可用時,Lambda 會持續處理批次,直到函數掌握串流資訊。事件來源映射會與碎片的其他取用者共用讀取傳輸量。

  • 增強型散發:若要將延遲降至最低並最大化讀取輸送量,請建立具有增強散發功能的資料串流取用者。增強散發取用者會取得每個碎片的專用連線,其不會影響其他從串流讀取的應用程式。串流取用者會使用 HTTP/2 來減少延遲,方法為透過長期連線將記錄推送至 Lambda,以及透過壓縮請求標頭。您可以使用 Kinesis 消費者 API 建立串流取用RegisterStream者

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

您應該會看到下列輸出:

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

若要提高函數處理記錄的速度,請將碎片新增至資料串流。Lambda 會依序在每個碎片中處理記錄。如果您的函數傳回錯誤,則會停止處理碎片中的其他記錄。碎片越多,要一次處理的批次越多,這會降低錯誤對並行的影響。

如果您的函數無法擴展至可處理並行批次的總數,您可以請求增加配額,或為您的函數保留並行

建立事件來源對應以叫用 Lambda 函數

若要使用資料串流中的記錄叫用 Lambda 函數,請建立事件來源對應。您可以建立多個事件來源映射,來使用多個 Lambda 函數處理相同資料,或使用單一函數處理來自多個資料串流的項目。處理來自多個串流的項目時,每個批次僅包含來自單一碎片或串流的記錄。

您可以設定事件來源對應,以處理來自不同串流的記錄 AWS 帳戶。如需進一步了解,請參閱建立跨帳戶事件來源映射

在建立事件來源對應之前,您需要授予 Lambda 函數從 Kinesis 資料串流讀取的權限。Lambda 需要下列權限來管理與 Kinesis 資料串流相關的資源:

受 AWS 管理的策略AWSLambdaKinesisExecutionRole包括這些權限。將此受管理的原則新增至您的函數,如下列程序所述。

AWS Management Console
若要將 Kinesis 權限新增至您的函數
  1. 開啟 Lambda 主控台的函數頁面,然後選取您的函數。

  2. 組態索引標籤中,選取權限

  3. 在 [執行角色] 窗格的 [角色名稱] 下,選擇函數執行角色的連結。此連結會在 IAM 主控台中開啟該角色的頁面。

  4. 在「權限原則」窗格中,選擇「新增權限」,然後選取「附加原則」。

  5. 在搜尋欄位中輸入 AWSLambdaKinesisExecutionRole

  6. 選取原則旁邊的核取方塊,然後選擇 [新增權限]。

AWS CLI
若要將 Kinesis 權限新增至您的函數
  • 執行下列 CLI 命令,將AWSLambdaKinesisExecutionRole原則新增至函數的執行角色:

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
若要將 Kinesis 權限新增至您的函數
  • 在函數的定義中,新增Policies屬性,如下列範例所示:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs20.x Policies: - AWSLambdaKinesisExecutionRole

設定所需權限後,請建立事件來源對應。

AWS Management Console
若要建立 Kinesis 事件來源對應
  1. 開啟 Lambda 主控台的函數頁面,然後選取您的函數。

  2. 函數概觀窗格中,選擇新增觸發條件

  3. 觸發器組態下,為來源選取 Kinesis

  4. 選取您要為其建立事件來源對應的 Kinesis 串流,並選擇性地選取串流的用戶。

  5. (選擇性) 編輯事件來源對應的「Batch 大小」、「起始位置」和「Batch」視窗

  6. 選擇新增

從主控台建立事件來源對應時,您的 IAM 角色必須具有 kinesis: ListStreams 和 kinesis:取用ListStream者權限。

AWS CLI
若要建立 Kinesis 事件來源對應
  • 執行下列 CLI 命令以建立 Kinesis 事件來源對應。根據您的使用案例選擇您自己的批次大小和起始位置。

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

若要指定批次化視窗,請加入選--maximum-batching-window-in-seconds項。若要取得有關使用此參數和其他參數的更多資訊,請參閱《指令參考》中的〈建立事件來源對映〉。AWS CLI

AWS SAM
若要建立 Kinesis 事件來源對應
  • 在函數的定義中,新增KinesisEvent屬性,如下列範例所示:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs20.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

若要進一步了解如何建立 Kinesis Data Streams 的事件來源對應 AWS SAM,請參閱AWS Serverless Application Model 開發人員指南中的 Kinesis

輪詢和串流開始位置

請注意,建立和更新事件來源映射期間的串流輪詢最終會一致。

  • 在建立事件來源映射期間,從串流開始輪詢事件可能需要幾分鐘時間。

  • 在更新事件來源映射期間,從串流停止並重新開始輪詢事件可能需要幾分鐘時間。

這種行為表示如果您指定 LATEST 當作串流的開始位置,事件來源映射可能會在建立或更新期間遺漏事件。若要確保沒有遺漏任何事件,請將串流開始位置指定為 TRIM_HORIZONAT_TIMESTAMP

建立跨帳戶事件來源映射

Amazon Kinesis Data Streams 支援以資源為基礎的政策。因此,您可以 AWS 帳戶 使用另一個帳戶中的 Lambda 函數來處理擷取到串流中的資料。

若要使用不同的 Kinesis 串流為 Lambda 函數建立事件來源對應 AWS 帳戶,您必須使用以資源為基礎的政策來設定串流,以授與 Lambda 函數讀取項目的權限。若要了解如何設定串流以允許跨帳戶存取,請參閱 Amazon Kinesis Streams 開發人員指南中的與跨帳戶 AWS Lambda 功能共用存取權。

使用資源型政策設定串流後,可為 Lambda 函數提供所需權限,請使用上一節所述的任何方法建立事件來源對應。

如果您選擇使用 Lambda 主控台建立事件來源對應,請將串流的 ARN 直接貼到輸入欄位。如果您想要指定串流的取用者,貼上取用者的 ARN 會自動填入串流欄位。