教學課程:使用 Amazon MSK事件來源映射來叫用 Lambda 函數 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程:使用 Amazon MSK事件來源映射來叫用 Lambda 函數

在本教學課程中,您將執行下列操作:

  • 在與現有 Amazon MSK叢集相同的 AWS 帳戶中建立 Lambda 函數。

  • 設定 Lambda 的網路和身分驗證,以與 Amazon 通訊MSK。

  • 設定 Lambda Amazon MSK事件來源映射,當 主題中出現事件時,它會執行您的 Lambda 函數。

完成這些步驟後,當事件傳送至 Amazon 時MSK,您將能夠設定 Lambda 函數,以使用您自己的自訂 Lambda 程式碼自動處理這些事件。

您可以使用此功能做些什麼?

範例解決方案:使用MSK事件來源映射,為您的客戶提供即時分數。

請考慮以下案例:您的公司託管了一個 Web 應用程式,您的客戶可以在其中檢視即時事件的相關資訊,例如運動遊戲。來自遊戲的資訊更新會透過 Amazon 上的 Kafka 主題提供給您的團隊MSK。您想要設計一個解決方案,以使用主題的更新MSK,為您所開發應用程式內的客戶提供即時事件的更新檢視。您已決定採用下列設計方法:您的用戶端應用程式將與 AWS中託管的無伺服器後端通訊。用戶端將使用 Amazon API Gateway 透過 Websocket 工作階段連線 WebSocket API。

在此解決方案中,您需要讀取MSK事件、執行一些自訂邏輯,為應用程式層準備這些事件,然後將該資訊轉送至API閘道 的元件API。您可以使用 實作此元件 AWS Lambda,方法是在 Lambda 函數中提供自訂邏輯,然後使用 AWS Lambda Amazon MSK事件來源映射呼叫它。

如需使用 Amazon API Gateway 實作解決方案的詳細資訊 WebSocket API,請參閱API閘道文件中WebSocket API的教學課程。

必要條件

具有下列預先設定資源 AWS 的帳戶:

為了滿足這些先決條件,建議您遵循 Amazon 文件中的開始使用 MSK Amazon。 MSK

  • Amazon MSK叢集。請參閱《Amazon 入門》中的建立 Amazon MSK叢集 MSK

  • 下列組態:

    • 確保在叢集安全設定中啟用IAM角色型身分驗證。這會限制您的 Lambda 函數只存取所需的 Amazon MSK 資源,藉此改善您的安全性。在新的 Amazon MSK叢集上預設會啟用此功能。

    • 確保您的叢集聯網設定中的公有存取已關閉。限制 Amazon MSK叢集對網際網路的存取,透過限制有多少中介機構處理您的資料,來改善您的安全性。在新的 Amazon MSK叢集上預設會啟用此功能。

  • Amazon MSK叢集中要用於此解決方案的 Kafka 主題。請參閱 Amazon 入門MSK中的建立主題

  • Kafka 管理員主機設定,可從 Kafka 叢集擷取資訊,並將 Kafka 事件傳送至您的主題進行測試,例如已安裝 Kafka 管理員CLI和 Amazon MSKIAM程式庫的 Amazon EC2執行個體。請參閱 Amazon 入門MSK中的建立用戶端機器

設定這些資源後,請從 AWS 您的帳戶收集以下資訊,以確認您已準備好繼續。

  • Amazon MSK叢集的名稱。您可以在 Amazon MSK主控台中找到此資訊。

  • 叢集 UUID是 ARN Amazon MSK叢集 的一部分,您可以在 Amazon MSK主控台中找到。請遵循 Amazon MSK 文件中的列出叢集中的程序來尋找此資訊。

  • 與您的 Amazon MSK叢集相關聯的安全群組。您可以在 Amazon MSK主控台中找到此資訊。在下列步驟中,將這些稱為您的 clusterSecurityGroups

  • VPC 包含 Amazon MSK叢集的 Amazon ID。您可以在 Amazon MSK主控台中識別與 Amazon MSK叢集相關聯的子網路,然後在 Amazon VPC主控台中識別與子網路VPC相關聯的 Amazon,以尋找此資訊。

  • 解決方案中使用的 Kafka 主題的名稱。您可以從 Kafka 管理員主機使用 Kafka 呼叫 topics CLI Amazon MSK叢集,以尋找此資訊。如需主題 的詳細資訊CLI,請參閱 Kafka 文件中的新增和移除主題

  • Kafka 主題的取用者群組名稱,適合供 Lambda 函數使用。此群組可由 Lambda 自動建立,因此您不需要使用 Kafka 建立CLI。如果您需要管理消費者群組,若要進一步了解消費者群組CLI,請參閱 Kafka 文件中的管理消費者群組

您 AWS 帳戶中的下列許可:

  • 建立和管理 Lambda 函數的許可。

  • 建立IAM政策並將其與您的 Lambda 函數建立關聯的許可。

  • 在VPC託管 Amazon MSK叢集的 Amazon 中建立 Amazon VPC端點和變更聯網組態的許可。

設定 Lambda 與 Amazon 通訊的網路連線 MSK

使用 AWS PrivateLink 連接 Lambda 和 Amazon MSK。您可以在 Amazon VPC主控台中建立界面 Amazon VPC端點來執行此操作。如需關於聯網組態的詳細資訊,請參閱設定網路安全

當 Amazon MSK事件來源映射代表 Lambda 函數執行時,它會擔任 Lambda 函數的執行角色。此IAM角色授權映射存取 保護的資源IAM,例如您的 Amazon MSK叢集。雖然元件共用執行角色,但 Amazon MSK映射和您的 Lambda 函數對其個別任務有不同的連線需求,如下圖所示。

Lambda 函數會輪詢叢集,並使用 與 Lambda 通訊 AWS STS。

您的事件來源映射屬於您的 Amazon MSK叢集安全群組。在此聯網步驟中,從您的 Amazon MSK叢集建立 Amazon VPC端點VPC,將事件來源映射連接到 Lambda STS和服務。保護這些端點,以接受來自 Amazon MSK叢集安全群組的流量。然後,調整 Amazon MSK叢集安全群組,以允許事件來源映射與 Amazon MSK叢集通訊。

您可以使用 AWS Management Console來設定下列步驟。

設定介面 Amazon VPC端點以連接 Lambda 和 Amazon MSK
  1. 為您的界面 Amazon VPC端點 建立安全群組endpointSecurityGroup,以允許來自 的 443 上的傳入TCP流量clusterSecurityGroups。請遵循 Amazon EC2 文件中的建立安全群組中的程序來建立安全群組。然後,遵循 Amazon EC2 文件中將規則新增至安全群組中的程序,以新增適當的規則。

    使用下列資訊建立一個安全群組:

    新增傳入規則時,請為 中的每個安全群組建立規則clusterSecurityGroups。對於每條規則:

    • 針對類型,選取 HTTPS

    • 針對來源,選取其中一個 clusterSecurityGroups

  2. 建立端點,將 Lambda 服務連線至VPC包含 Amazon MSK叢集的 Amazon。遵循 Create an interface endpoint 中的程序。

    使用下列資訊建立一個介面端點:

    • 針對服務名稱,選取 com.amazonaws.regionName.lambda,其中 regionName託管您的 Lambda 函數。

    • 針對 VPC,選取VPC包含 Amazon MSK叢集的 Amazon。

    • 針對安全群組,選取您先前建立的 endpointSecurityGroup

    • 針對子網路,選取託管 Amazon MSK叢集的子網路。

    • 針對政策,提供下列政策文件,這會保護端點以供 Lambda 服務主體用於 lambda:InvokeFunction 動作。

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 確保啟用DNS名稱保持設定狀態。

  3. 建立端點,將 AWS STS 服務連接到VPC包含 Amazon MSK叢集的 Amazon。遵循 Create an interface endpoint 中的程序。

    使用下列資訊建立一個介面端點:

    • 針對服務名稱,選取 AWS STS。

    • 針對 VPC,選取VPC包含 Amazon MSK叢集的 Amazon。

    • 針對安全群組,選取 endpointSecurityGroup

    • 針對子網路,選取託管 Amazon MSK叢集的子網路。

    • 針對政策,提供下列政策文件,這會保護端點以供 Lambda 服務主體用於 sts:AssumeRole 動作。

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 確保啟用DNS名稱保持設定狀態。

  4. 對於與您的 Amazon MSK叢集相關聯的每個安全群組,亦即,在 中clusterSecurityGroups,允許以下項目:

    • 允許 9098 上的所有傳入和傳出TCP流量傳送到所有 clusterSecurityGroups,包括其本身。

    • 允許 443 上的所有傳出TCP流量。

    根據預設,安全群組規則允許其中一些流量,因此如果您的叢集連接到單一安全群組,且該群組具有預設規則,則不需要額外的規則。若要調整安全群組規則,請遵循 Amazon EC2 文件中將規則新增至安全群組中的程序。

    使用下列資訊將規則新增至您的安全群組:

    • 針對連接埠 9098 的每個傳入或傳出規則,提供

      • 針對類型,選取自訂 TCP

      • 對於連接埠範圍,提供 9098。

      • 針對來源,提供其中一個 clusterSecurityGroups

    • 針對連接埠 443 的每個傳入規則,針對類型,選取 HTTPS

為 Lambda 建立IAM角色以從您的 Amazon MSK主題中讀取

識別 Lambda 從 Amazon MSK主題讀取的身分驗證要求,然後在政策中定義。建立lambdaAuthRole授權 Lambda 使用這些許可的角色 。使用 動作在您的 Amazon MSK叢集上授權kafka-clusterIAM動作。然後,授權 Lambda 執行探索MSKkafka和連線至 Amazon MSK叢集所需的 Amazon 和 Amazon EC2動作,以及 CloudWatch 動作,以便 Lambda 可以記錄已完成的動作。

描述 Lambda 從 Amazon 讀取的身分驗證要求 MSK
  1. 撰寫IAM政策文件 (JSON文件)clusterAuthPolicy,允許 Lambda 使用您的 Kafka 消費者群組,從 Amazon MSK叢集中的 Kafka 主題讀取。Lambda 要求在讀取時設定 Kafka 取用者群組。

    修改以下範本以符合您的先決條件:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:region:account-id:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:region:account-id:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    如需詳細資訊,請參閱 IAM 角色型身分驗證。在撰寫政策時:

    • 針對 regionaccount-id,提供託管 Amazon MSK叢集的 。

    • 針對 mskClusterName,請提供 Amazon MSK叢集的名稱。

    • 針對 cluster-uuid,請在 UUID中ARN為您的 Amazon MSK叢集提供 。

    • 針對 mskTopicName,請提供 Kafka 主題的名稱。

    • 針對 mskGroupName,請提供 Kafka 消費者群組的名稱。

  2. 識別 Lambda 探索EC2和連接 Amazon MSK叢集所需的 Amazon MSK、Amazon 和 CloudWatch 許可,並記錄這些事件。

    AWSLambdaMSKExecutionRole 受管政策允許定義必要的許可。在下列步驟中使用該政策。

    在生產環境中,評估 AWSLambdaMSKExecutionRole 以根據最低權限原則限制執行角色政策,然後為您的角色撰寫取代此受管政策的政策。

如需IAM政策語言的詳細資訊,請參閱 IAM 文件

現在您已撰寫政策文件,請建立IAM政策,以便將其連接至您的角色。您可以使用主控台執行下列程序,以完成此操作。

從IAM政策文件建立政策
  1. 登入 AWS Management Console 並在 開啟IAM主控台https://console.aws.amazon.com/iam/

  2. 在左側的導覽窗格中,選擇 Policies (政策)

  3. 選擇 Create policy (建立政策)。

  4. 政策編輯器區段中,選擇 JSON選項。

  5. 貼上 clusterAuthPolicy

  6. 將許可新增至政策後,請選擇下一步

  7. 檢視與建立頁面上,為您正在建立的政策輸入政策名稱描述 (選用)。檢視此政策中定義的許可,來查看您的政策所授予的許可。

  8. 選擇 Create policy (建立政策) 儲存您的新政策。

如需詳細資訊,請參閱 IAM 文件中的建立IAM政策

現在您已有適當的IAM政策,請建立角色並將其連接至該角色。您可以使用主控台執行下列程序,以完成此操作。

在 IAM 主控台中建立執行角色
  1. 在 IAM 主控台中開啟角色頁面。

  2. 選擇建立角色

  3. 受信任的實體類型下,選擇 AWS  服務

  4. 使用案例 下,選擇 Lambda

  5. 選擇 Next (下一步)

  6. 選取以下政策:

    • clusterAuthPolicy

    • AWSLambdaMSKExecutionRole

  7. 選擇 Next (下一步)

  8. 針對角色名稱,輸入 lambdaAuthRole,然後選擇建立角色

如需詳細資訊,請參閱使用執行角色定義 Lambda 函數許可

建立 Lambda 函數以從您的 Amazon MSK主題中讀取

建立設定為使用您的IAM角色的 Lambda 函數。您可以使用主控台建立 Lambda 函數。

使用驗證組態建立 Lambda 函數
  1. 開啟 Lambda 主控台,然後從標頭選取建立函數

  2. 選取從頭開始撰寫

  3. 針對函數名稱,提供您選擇的適當名稱。

  4. 針對執行時期,選擇最新支援Node.js 版本,以使用本教學課程中提供的程式碼。

  5. 選擇變更預設執行角色

  6. 選取使用現有角色

  7. 針對現有角色,選取 lambdaAuthRole

在生產環境中,您通常需要將更多政策新增至 Lambda 函數的執行角色,以有意義的方式處理您的 Amazon MSK事件。如需將政策新增至角色的詳細資訊,請參閱 IAM 文件中的新增或移除身分許可

建立 Lambda 函數的事件來源映射

您的 Amazon MSK事件來源映射為 Lambda 服務提供在發生適當的 Amazon MSK事件時叫用 Lambda 所需的資訊。您可以使用 主控台建立 Amazon MSK映射。建立 Lambda 觸發條件,然後會自動設定事件來源映射。

建立 Lambda 觸發條件 (和事件來源映射)
  1. 導覽至 Lambda 函數的概觀頁面。

  2. 在函數概觀區段中,選擇左下角的新增觸發條件

  3. 選取來源下拉式清單中,選取 Amazon MSK

  4. 請勿設定身分驗證

  5. 針對MSK叢集,選取叢集的名稱。

  6. 針對批次大小,請輸入 1。此步驟可讓此功能更易於測試,而且不是生產中的理想值。

  7. 針對主題名稱,請提供 Kafka 主題名稱。

  8. 針對取用者群組 ID,請提供 Kafka 取用者群組的 ID。

更新您的 Lambda 函數以讀取串流資料

Lambda 透過事件方法參數提供關於 Kafka 事件的資訊。如需 Amazon MSK事件的範例結構,請參閱 範例事件。在您了解如何解譯 Lambda 轉送的 Amazon MSK事件之後,您可以變更 Lambda 函數程式碼,以使用他們提供的資訊。

將下列程式碼提供給 Lambda 函數,以記錄 Lambda Amazon MSK事件的內容,用於測試目的:

.NET
AWS SDK for .NET
注意

還有更多功能 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 使用 Lambda 使用 Amazon MSK事件NET。

using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KafkaEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace MSKLambda; public class Function { /// <param name="input">The event for the Lambda function handler to process.</param> /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param> /// <returns></returns> public void FunctionHandler(KafkaEvent evnt, ILambdaContext context) { foreach (var record in evnt.Records) { Console.WriteLine("Key:" + record.Key); foreach (var eventRecord in record.Value) { var valueBytes = eventRecord.Value.ToArray(); var valueText = Encoding.UTF8.GetString(valueBytes); Console.WriteLine("Message:" + valueText); } } } }
Go
SDK for Go V2
注意

還有更多功能 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Go 搭配 Lambda 使用 Amazon MSK事件。

package main import ( "encoding/base64" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(event events.KafkaEvent) { for key, records := range event.Records { fmt.Println("Key:", key) for _, record := range records { fmt.Println("Record:", record) decodedValue, _ := base64.StdEncoding.DecodeString(record.Value) message := string(decodedValue) fmt.Println("Message:", message) } } } func main() { lambda.Start(handler) }
Java
SDK 適用於 Java 2.x
注意

還有更多功能 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Java 搭配 Lambda 使用 Amazon MSK事件。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
JavaScript
SDK for JavaScript (v3)
注意

還有更多功能 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 搭配 Lambda 使用 Amazon MSK事件 JavaScript。

exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
PHP
適用於 PHP 的 SDK
注意

還有更多功能 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 搭配 Lambda 使用 Amazon MSK事件PHP。

<?php // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kafka\KafkaEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): void { $kafkaEvent = new KafkaEvent($event); $this->logger->info("Processing records"); $records = $kafkaEvent->getRecords(); foreach ($records as $record) { try { $key = $record->getKey(); $this->logger->info("Key: $key"); $values = $record->getValue(); $this->logger->info(json_encode($values)); foreach ($values as $value) { $this->logger->info("Value: $value"); } } catch (Exception $e) { $this->logger->error($e->getMessage()); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python (Boto3)
注意

還有更多功能 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 使用 Amazon MSK事件。

import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)
Ruby
SDK 適用於 Ruby
注意

還有更多功能 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Ruby 搭配 Lambda 使用 Amazon MSK事件。

require 'base64' def lambda_handler(event:, context:) # Iterate through keys event['records'].each do |key, records| puts "Key: #{key}" # Iterate through records records.each do |record| puts "Record: #{record}" # Decode base64 msg = Base64.decode64(record['value']) puts "Message: #{msg}" end end end

您可以使用主控台將函數程式碼提供給 Lambda。

若要使用主控台程式碼編輯器更新函數程式碼
  1. 開啟 Lambda 主控台的函數頁面,然後選取您的函數。

  2. 選取程式碼索引標籤。

  3. 程式碼來源窗格中,選取您的原始程式碼檔案,然後在整合式程式碼編輯器中加以編輯。

  4. DEPLOY區段中,選擇部署以更新函數的程式碼:

    Lambda 主控台程式碼編輯器中的「部署」按鈕

測試您的 Lambda 函數,以確認其已連線至您的 Amazon MSK主題

您現在可以透過檢查事件日誌來驗證 CloudWatch 事件來源是否正在調用您的 Lambda。

驗證您的 Lambda 函數是否正在被調用
  1. 使用您的 Kafka 管理員主機,使用 產生 Kafka kafka-console-producer 事件CLI。如需詳細資訊,請參閱 Kafka 文件中的 Write some events into the topic。傳送足夠的事件,以填滿由批次大小定義的批次,用於上一個步驟中定義的事件來源映射,否則 Lambda 會等待調用更多資訊。

  2. 如果您的函數執行,Lambda 會寫入發生的情況 CloudWatch。在主控台中,導覽至您的 Lambda 函數詳情頁面。

  3. 選取 Configuration (組態) 索引標籤。

  4. 從側邊列,選取監控和操作工具

  5. 記錄組態下識別日誌CloudWatch 群組。日誌群組應以 /aws/lambda 開頭。選擇日誌群組連結。

  6. 在 CloudWatch 主控台中,檢查日誌事件是否有 Lambda 已傳送至日誌串流的日誌事件。識別是否有日誌事件包含來自 Kafka 事件的訊息,如下圖所示。如果有,您已成功MSK使用 Lambda 事件來源映射將 Lambda 函數連線至 Amazon。

    中的日誌事件, CloudWatch 顯示由提供的程式碼擷取的事件資訊。