教學課程:使用 Amazon MSK 事件來源映射來調用 Lambda 函數 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程:使用 Amazon MSK 事件來源映射來調用 Lambda 函數

在本教學課程中,您將執行下列操作:

  • 在與現有 Amazon MSK 叢集相同的 AWS 帳戶中建立 Lambda 函數。

  • 為 Lambda 設定聯網和身分驗證,以與 Amazon MSK 通訊。

  • 設定 Lambda Amazon MSK 事件來源映射,當主題中出現事件時,該映射會執行您的 Lambda 函數。

完成這些步驟後,當事件傳送至 Amazon MSK 時,您將能夠設定 Lambda 函數,以使用您自己的自訂 Lambda 程式碼自動處理這些事件。

您可以使用此功能做些什麼?

範例解決方案:使用 MSK 事件來源映射為您的客戶提供即時分數。

請考慮以下案例:您的公司託管了一個 Web 應用程式,您的客戶可以在其中檢視即時事件的相關資訊,例如運動遊戲。來自遊戲的資訊更新會透過 Amazon MSK 上的一個 Kafka 主題提供給您的團隊。您想要設計一個使用 MSK 主題更新的解決方案,以便在您所開發的應用程式內為客戶提供即時事件的更新檢視。您已決定採用下列設計方法:您的用戶端應用程式將與 AWS中託管的無伺服器後端通訊。用戶端將使用 Amazon API Gateway WebSocket API,透過 WebSocket 工作階段連線。

在此解決方案中,您需要一個元件來讀取 MSK 事件、執行一些自訂邏輯來準備應用程式層的事件,然後將該資訊轉送至 API Gateway API。您可以使用 實作此元件 AWS Lambda,方法是在 Lambda 函數中提供自訂邏輯,然後使用 AWS Lambda Amazon MSK 事件來源映射呼叫它。

如需使用 Amazon API Gateway WebSocket API 實作解決方案的詳細資訊,請參閱 API Gateway 文件中的 WebSocket API tutorials

必要條件

具有下列預先設定資源 AWS 的帳戶:

為了滿足這些先決條件,我們建議您遵循 Amazon MSK 文件中的 Getting started using Amazon MSK

  • Amazon MSK 叢集。請參閱開始使用 Amazon MSK中的建立 Amazon MSK 叢集

  • 下列組態:

    • 確定叢集安全設定中已啟用 IAM 角色型身分驗證。這會透過限制您的 Lambda 函數僅存取所需的 Amazon MSK 資源來提高您的安全性。在新的 Amazon MSK 叢集上預設會啟用此功能。

    • 確保您的叢集聯網設定中的公有存取已關閉。限制 Amazon MSK 叢集對網際網路的存取,可限制有多少中介機構處理您的資料,藉此提高您的安全性。在新的 Amazon MSK 叢集上預設會啟用此功能。

  • Amazon MSK 叢集中用於此解決方案的 Kafka 主題。請參閱開始使用 Amazon MSK 中的建立主題

  • Kafka 管理員主機,設定為從 Kafka 叢集擷取資訊,並將 Kafka 事件傳送至您的主題進行測試,例如已安裝 Kafka 管理員 CLI 和 Amazon MSK IAM 程式庫的 Amazon EC2 執行個體。請參閱開始使用 Amazon MSK 中的建立用戶端機器

設定這些資源後,請從 AWS 您的帳戶收集以下資訊,以確認您已準備好繼續。

  • Amazon MSK 叢集的名稱。您可以在 Amazon MSK 主控台中找到此資訊。

  • 叢集 UUID 是 Amazon MSK 叢集 ARN 的一部分,可以在 Amazon MSK 主控台中找到。請執行 Amazon MSK 文件中 Listing clusters 中的程序來尋找此資訊。

  • 與您的 Amazon MSK 叢集相關聯的安全群組。您可以在 Amazon MSK 主控台中找到此資訊。在下列步驟中,將這些安全群組稱為 clusterSecurityGroups

  • 包含 Amazon MSK 叢集的 Amazon VPC 的 ID。您可以在 Amazon MSK 主控台中識別與 Amazon MSK 叢集相關聯的子網路,然後在 Amazon VPC 主控台中識別與子網路相關聯的 Amazon VPC,籍此找到此資訊。

  • 解決方案中使用的 Kafka 主題的名稱。您可以從 Kafka 管理員主機使用 Kafka topics CLI 呼叫 Amazon MSK 叢集,籍此找到此資訊。如需關於主題 CLI 的詳細資訊,請參閱 Kafka 文件中的 Adding and removing topics

  • Kafka 主題的取用者群組名稱,適合供 Lambda 函數使用。此群組可由 Lambda 自動建立,因此不需要使用 Kafka CLI 建立。如果您需要管理取用者群組,以進一步了解取用者群組 CLI 的詳細資訊,則請參閱 Kafka 文件中的 Managing Consumer Groups

您 AWS 帳戶中的下列許可:

  • 建立和管理 Lambda 函數的許可。

  • 建立 IAM 政策並與您的 Lambda 函數建立關聯的許可。

  • 在託管 Amazon MSK 叢集的 Amazon VPC 中建立 Amazon VPC 端點和變更聯網組態的許可。

設定供 Lambda 與 Amazon MSK 通訊的網路連線

使用 AWS PrivateLink 來連接 Lambda 和 Amazon MSK。您可以在 Amazon VPC 主控台中建立介面 Amazon VPC 端點來執行此操作。如需關於聯網組態的詳細資訊,請參閱設定網路安全

當 Amazon MSK 事件來源映射代表 Lambda 函數執行時,它會擔任 Lambda 函數的執行角色。此 IAM 角色會授權映射存取 IAM 保護的資源,例如您的 Amazon MSK 叢集。雖然元件共用執行角色,但 Amazon MSK 映射和您的 Lambda 函數對各自的任務有不同的連線需求,如下圖所示。

Lambda 函數會輪詢叢集,並使用 與 Lambda 通訊 AWS STS。

事件來源映射屬於 Amazon MSK 叢集安全群組。在此聯網步驟中,從您的 Amazon MSK 叢集 VPC 建立 Amazon VPC 端點,將事件來源映射連線到 Lambda 和 STS 服務。保護這些端點,接受來自 Amazon MSK 叢集安全群組的流量。然後,調整 Amazon MSK 叢集安全群組,允許事件來源映射與 Amazon MSK 叢集通訊。

您可以使用 AWS Management Console來設定下列步驟。

若要設定介面 Amazon VPC 端點連線 Lambda 和 Amazon MSK
  1. 為您的介面 Amazon VPC 端點 endpointSecurityGroup 建立安全群組,允許 443 上來自 clusterSecurityGroups 的傳入 TCP 流量。請遵循 Amazon EC2 文件中的 Create a security group 中的程序來建立安全群組。然後,遵循 Amazon EC2 文件中的 Add rules to a security group 中的程序,以新增適當的規則。

    使用下列資訊建立一個安全群組:

    在新增傳入規則時,為 clusterSecurityGroups 中的每個安全群組建立規則。對於每條規則:

    • 針對類型,選取 HTTPS

    • 針對來源,選取其中一個 clusterSecurityGroups

  2. 建立一個端點,將 Lambda 服務連線至包含 Amazon MSK 叢集的 Amazon VPC。遵循 Create an interface endpoint 中的程序。

    使用下列資訊建立一個介面端點:

    • 針對服務名稱,選取 com.amazonaws.regionName.lambda,其中 regionName 會託管您的 Lambda 函數。

    • 針對 VPC,選取包含 Amazon MSK 叢集的 Amazon VPC。

    • 針對安全群組,選取先前建立的 endpointSecurityGroup

    • 針對子網路,選取託管您的 Amazon MSK 叢集的子網路。

    • 針對政策,提供下列政策文件,這會保護端點以供 Lambda 服務主體用於 lambda:InvokeFunction 動作。

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 確保啟用 DNS 名稱保持設定狀態。

  3. 建立端點,將 AWS STS 服務連線至包含 Amazon MSK 叢集的 Amazon VPC。遵循 Create an interface endpoint 中的程序。

    使用下列資訊建立一個介面端點:

    • 針對服務名稱,選取 AWS STS。

    • 針對 VPC,選取包含 Amazon MSK 叢集的 Amazon VPC。

    • 針對安全群組,選取 endpointSecurityGroup

    • 針對子網路,選取託管您的 Amazon MSK 叢集的子網路。

    • 針對政策,提供下列政策文件,這會保護端點以供 Lambda 服務主體用於 sts:AssumeRole 動作。

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 確保啟用 DNS 名稱保持設定狀態。

  4. 對於與您的 Amazon MSK 叢集相關聯的每個安全群組,亦即,在 clusterSecurityGroups 中,允許以下項目:

    • 允許 9098 上所有 clusterSecurityGroups 的傳入和傳出 TCP 流量,包括其本身以內的 TCP 流量。

    • 允許 443 上的所有傳出 TCP 流量。

    根據預設,安全群組規則允許其中一些流量,因此如果您的叢集連接到單一安全群組,且該群組具有預設規則,則不需要額外的規則。若要調整安全群組規則,請遵循 Amazon EC2 文件中的 Add rules to a security group 中的程序。

    使用下列資訊將規則新增至您的安全群組:

    • 針對連接埠 9098 的每個傳入或傳出規則,提供

      • 針對類型,選取自訂 TCP

      • 對於連接埠範圍,提供 9098。

      • 針對來源,提供其中一個 clusterSecurityGroups

    • 對於連接埠 443 的每條傳入規則,針對類型,選取 HTTPS

為 Lambda 建立 IAM 角色,以從 Amazon MSK 主題讀取

識別 Lambda 從 Amazon MSK 主題讀取的身分驗證要求,然後在政策中定義這些要求。建立角色 lambdaAuthRole,授權 Lambda 使用這些許可。使用 kafka-cluster IAM 動作授權 Amazon MSK 叢集上的動作。然後,授權 Lambda 執行探索和連線至 Amazon MSK 叢集所需的 Amazon MSK kafka 和 Amazon EC2 動作,以及 CloudWatch 動作 (以便 Lambda 可以記錄其已完成的動作)。

若要描述 Lambda 從 Amazon MSK 讀取的身分驗證要求
  1. 撰寫 IAM 政策文件 (JSON 文件) clusterAuthPolicy,允許 Lambda 使用您的 Kafka 取用者群組,從 Amazon MSK 叢集中的 Kafka 主題讀取。Lambda 要求在讀取時設定 Kafka 取用者群組。

    修改以下範本以符合您的先決條件:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:region:account-id:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:region:account-id:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    如需詳細資訊,請參閱 IAM 角色型身分驗證。在撰寫政策時:

    • 對於區域帳戶 ID,請提供託管 Amazon MSK 叢集的相應資訊。

    • 對於 mskClusterName,請提供 Amazon MSK 叢集的名稱。

    • 對於 cluster-uuid,請提供 Amazon MSK 叢集 ARN 中的 UUID。

    • 對於 mskTopicName,請提供 Kafka 主題的名稱。

    • 針對 mskGroupName,請提供 Kafka 取用者群組的名稱。

  2. 識別 Lambda 探索和連線 Amazon MSK 叢集以及記錄這些事件所需的 Amazon MSK、Amazon EC2 和 CloudWatch 許可。

    AWSLambdaMSKExecutionRole 受管政策允許定義必要的許可。在下列步驟中使用該政策。

    在生產環境中,評估 AWSLambdaMSKExecutionRole 以根據最低權限原則限制執行角色政策,然後為您的角色撰寫取代此受管政策的政策。

如需有關 IAM 政策語言的詳細資訊,請參閱 IAM 文件

現在您已撰寫政策文件,請建立 IAM 政策,以便將其連接至您的角色。您可以使用主控台執行下列程序,以完成此操作。

若要從政策文件建立 IAM 政策
  1. 登入 AWS Management Console ,並在 https://https://console.aws.amazon.com/iam/ 開啟 IAM 主控台。

  2. 在左側的導覽窗格中,選擇 Policies (政策)

  3. 選擇 Create policy (建立政策)。

  4. 政策編輯器中,選擇 JSON 選項。

  5. 貼上 clusterAuthPolicy

  6. 將許可新增至政策後,請選擇下一步

  7. 檢視與建立頁面上,為您正在建立的政策輸入政策名稱描述 (選用)。檢視此政策中定義的許可,來查看您的政策所授予的許可。

  8. 選擇 Create policy (建立政策) 儲存您的新政策。

如需詳細資訊,請參閱 IAM 文件中的 Creating IAM policies

現在您擁有適當的 IAM 政策,請建立一個角色並將這些政策連接至該角色。您可以使用主控台執行下列程序,以完成此操作。

若要在 IAM 主控台中建立執行角色
  1. 在 IAM 主控台中開啟 角色頁面

  2. 選擇建立角色

  3. 受信任的實體類型下,選擇 AWS  服務

  4. 使用案例 下,選擇 Lambda

  5. 選擇 Next (下一步)

  6. 選取以下政策:

    • clusterAuthPolicy

    • AWSLambdaMSKExecutionRole

  7. 選擇 Next (下一步)

  8. 針對角色名稱,輸入 lambdaAuthRole,然後選擇建立角色

如需詳細資訊,請參閱使用執行角色定義 Lambda 函數許可

建立 Lambda 函數以從您的 Amazon MSK 主題中讀取

建立設定為使用您的 IAM 角色的 Lambda 函數。您可以使用主控台建立 Lambda 函數。

若要使用驗證組態建立 Lambda 函數
  1. 開啟 Lambda 主控台,然後從標頭選取建立函數

  2. 選取從頭開始撰寫

  3. 針對函數名稱,提供您選擇的適當名稱。

  4. 針對執行時期,選擇最新支援Node.js 版本,以使用本教學課程中提供的程式碼。

  5. 選擇變更預設執行角色

  6. 選取使用現有角色

  7. 針對現有角色,選取 lambdaAuthRole

在生產環境中,您通常需要將更多政策新增至 Lambda 函數的執行角色,以有意義的方式處理您的 Amazon MSK 事件。如需將政策新增至角色的詳細資訊,請參閱 IAM 文件中的 Add or remove identity permissions

建立 Lambda 函數的事件來源映射

您的 Amazon MSK 事件來源映射為 Lambda 服務提供了在發生適當的 Amazon MSK 事件時調用 Lambda 所需的資訊。您可以使用主控台建立 Amazon MSK 映射。建立 Lambda 觸發條件,然後會自動設定事件來源映射。

若要建立 Lambda 觸發條件 (和事件來源映射)
  1. 導覽至 Lambda 函數的概觀頁面。

  2. 在函數概觀區段中,選擇左下角的新增觸發條件

  3. 選取來源下拉式清單中,選取 Amazon MSK

  4. 請勿設定身分驗證

  5. 針對 MSK 叢集,請選取您的叢集名稱。

  6. 針對批次大小,請輸入 1。此步驟可讓此功能更易於測試,而且不是生產中的理想值。

  7. 針對主題名稱,請提供 Kafka 主題名稱。

  8. 針對取用者群組 ID,請提供 Kafka 取用者群組的 ID。

更新您的 Lambda 函數以讀取串流資料

Lambda 透過事件方法參數提供關於 Kafka 事件的資訊。如需 Amazon MSK 事件的結構範例,請參閱 範例事件。了解如何解譯 Lambda 轉送的 Amazon MSK 事件之後,您可以變更 Lambda 函數程式碼,以使用它們提供的資訊。

將下列程式碼提供給 Lambda 函數,以記錄 Lambda Amazon MSK 事件的內容供測試之用:

.NET
AWS SDK for .NET
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 .NET 搭配 Lambda 來取用 Amazon MSK 事件。

using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KafkaEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace MSKLambda; public class Function { /// <param name="input">The event for the Lambda function handler to process.</param> /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param> /// <returns></returns> public void FunctionHandler(KafkaEvent evnt, ILambdaContext context) { foreach (var record in evnt.Records) { Console.WriteLine("Key:" + record.Key); foreach (var eventRecord in record.Value) { var valueBytes = eventRecord.Value.ToArray(); var valueText = Encoding.UTF8.GetString(valueBytes); Console.WriteLine("Message:" + valueText); } } } }
Go
SDK for Go V2
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Go 搭配 Lambda 來取用 Amazon MSK 事件。

package main import ( "encoding/base64" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(event events.KafkaEvent) { for key, records := range event.Records { fmt.Println("Key:", key) for _, record := range records { fmt.Println("Record:", record) decodedValue, _ := base64.StdEncoding.DecodeString(record.Value) message := string(decodedValue) fmt.Println("Message:", message) } } } func main() { lambda.Start(handler) }
Java
SDK for Java 2.x
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Java 搭配 Lambda 來取用 Amazon MSK 事件。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
JavaScript
SDK for JavaScript (v3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 JavaScript 搭配 Lambda 來取用 Amazon MSK 事件。

exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
PHP
SDK for PHP
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 PHP 搭配 Lambda 來取用 Amazon MSK 事件。

<?php // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kafka\KafkaEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): void { $kafkaEvent = new KafkaEvent($event); $this->logger->info("Processing records"); $records = $kafkaEvent->getRecords(); foreach ($records as $record) { try { $key = $record->getKey(); $this->logger->info("Key: $key"); $values = $record->getValue(); $this->logger->info(json_encode($values)); foreach ($values as $value) { $this->logger->info("Value: $value"); } } catch (Exception $e) { $this->logger->error($e->getMessage()); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 來取用 Amazon MSK 事件。

import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)
Ruby
SDK for Ruby
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Ruby 搭配 Lambda 來取用 Amazon MSK 事件。

require 'base64' def lambda_handler(event:, context:) # Iterate through keys event['records'].each do |key, records| puts "Key: #{key}" # Iterate through records records.each do |record| puts "Record: #{record}" # Decode base64 msg = Base64.decode64(record['value']) puts "Message: #{msg}" end end end
Rust
SDK for Rust
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Rust 搭配 Lambda 使用 Amazon MSK 事件。

use aws_lambda_events::event::kafka::KafkaEvent; use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent}; use base64::prelude::*; use serde_json::{Value}; use tracing::{info}; /// Pre-Requisites: /// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html /// 2. Add packages tracing, tracing-subscriber, serde_json, base64 /// /// This is the main body for the function. /// Write your code inside it. /// There are some code example in the following URLs: /// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples /// - https://github.com/aws-samples/serverless-rust-demo/ async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> { let payload = event.payload.records; for (_name, records) in payload.iter() { for record in records { let record_text = record.value.as_ref().ok_or("Value is None")?; info!("Record: {}", &record_text); // perform Base64 decoding let record_bytes = BASE64_STANDARD.decode(record_text)?; let message = std::str::from_utf8(&record_bytes)?; info!("Message: {}", message); } } Ok(().into()) } #[tokio::main] async fn main() -> Result<(), Error> { // required to enable CloudWatch error logging by the runtime tracing::init_default_subscriber(); info!("Setup CW subscriber!"); run(service_fn(function_handler)).await }

您可以使用主控台將函數程式碼提供給 Lambda。

若要使用主控台程式碼編輯器更新函數程式碼
  1. 開啟 Lambda 主控台的函數頁面,然後選取您的函數。

  2. 選取程式碼索引標籤。

  3. 程式碼來源窗格中,選取您的原始程式碼檔案,然後在整合式程式碼編輯器中加以編輯。

  4. DEPLOY 區段中,選擇部署以更新函數的程式碼:

    Lambda 主控台程式碼編輯器中的「部署」按鈕

測試您的 Lambda 函數,確認其已連線至您的 Amazon MSK 主題

您現在可以透過檢查 CloudWatch 事件日誌,驗證您的 Lambda 是否正在被事件來源調用。

若要驗證您的 Lambda 函數是否正在被調用
  1. 使用您的 Kafka 管理員主機,透過 kafka-console-producer CLI 產生 Kafka 事件。如需詳細資訊,請參閱 Kafka 文件中的 Write some events into the topic。傳送足夠的事件,以填滿由批次大小定義的批次,用於上一個步驟中定義的事件來源映射,否則 Lambda 會等待調用更多資訊。

  2. 如果您的函數執行,Lambda 會將發生的情況寫入 CloudWatch。在主控台中,導覽至您的 Lambda 函數詳情頁面。

  3. 選取 Configuration (組態) 索引標籤。

  4. 從側邊列,選取監控和操作工具

  5. 記錄組態下,識別 CloudWatch 日誌群組。日誌群組應以 /aws/lambda 開頭。選擇日誌群組連結。

  6. 在 CloudWatch 主控台中,檢查日誌事件,了解 Lambda 已傳送至日誌串流的日誌事件。識別是否有日誌事件包含來自 Kafka 事件的訊息,如下圖所示。如果有,您已成功使用 Lambda 事件來源映射將 Lambda 函數連線至 Amazon MSK。

    CloudWatch 中的日誌事件,顯示了由提供的程式碼擷取的事件資訊。