教學課程:使用 Amazon MSK 事件來源對應來叫用 Lambda 函數 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程:使用 Amazon MSK 事件來源對應來叫用 Lambda 函數

在本教學課程中,您將執行下列操作:

  • 在與現有 Amazon MSK 叢集相同的 AWS 帳戶中建立 Lambda 函數。

  • 設定 Lambda 的聯網和身份驗證,以便與 Amazon MSK 進行通訊。

  • 設定 Lambda Amazon MSK 事件來源對應,當事件出現在主題中時,會執行 Lambda 函數。

完成這些步驟後,當事件傳送至 Amazon MSK 時,您將能夠設定 Lambda 函數,使用您自己的自訂 Lambda 程式碼自動處理這些事件。

您可以使用此功能做什麼?

範例解決方案:使用 MSK 事件來源對應,為您的客戶提供即時分數。

請考慮下列案例:您的公司主控一個 Web 應用程式,您的客戶可以在其中檢視即時賽事的相關資訊,例如體育比賽。遊戲的資訊更新會透過 Amazon MSK 上的 Kafka 主題提供給您的團隊。您想要設計一個解決方案,使用 MSK 主題的更新,以便為您開發的應用程式內的客戶提供現場活動的更新檢視。您已決定採用下列設計方法:您的用戶端應用程式將與託管於 AWS. 用戶端將使用 Amazon API Gateway WebSocket API 透過網路通訊端工作階段進行連線。

在此解決方案中,您需要一個可讀取 MSK 事件的元件,執行一些自訂邏輯,為應用程式層準備這些事件,然後將該資訊轉送至 API Gateway API。您可以透過在 Lambda 函數中提供自訂邏輯,然後使用 AWS Lambda Amazon MSK 事件來源對應來呼叫它來實作此元件。 AWS Lambda

如需使用 Amazon API Gateway WebSocket API 實作解決方案的詳細資訊,請參閱 WebSocket API 閘道文件中的 API 教學課程

必要條件

具有下列預先設定資源的 AWS 帳號:

了滿足這些先決條件,我們建議您遵循 Amazon MSK 文件中的「開始使用 Amazon MSK」。

  • Amazon MSK 叢集。請參閱開始使用 Amazon MSK 中的建立 Amazon MS K 叢集

  • 以下配置:

    • 確保叢集安全性設定中已啟用 IAM 角色型身份驗證。如此可將 Lambda 函數限制為僅存取所需的 Amazon MSK 資源,藉此提升您的安全性。根據預設,新的 Amazon MSK 叢集會啟用此功能。

    • 確定叢集網路設定中的公用存取已關閉。限制 Amazon MSK 叢集對網際網路的存取權限,可限制處理資料的中介機構數目,從而提高安全性。根據預設,新的 Amazon MSK 叢集會啟用此功能。

  • Amazon MSK 叢集中用於此解決方案的卡夫卡主題。請參閱開始使用 Amazon MSK 中的建立主題。

  • Kafka 管理員主機設定用於從您的卡夫卡叢集擷取資訊,並將卡夫卡事件傳送到您的主題進行測試,例如安裝了 Kafka 管理 CLI 和 Amazon MSK IAM 程式庫的 Amazon EC2 執行個體。請參閱開始使用 Amazon MSK 中的建立用戶端電腦。

設定完這些資源後,請從您的 AWS 帳戶收集下列資訊,以確認您已準備好繼續。

  • 您的 Amazon MSK 叢集的名稱。您可以在 Amazon MSK 主控台中找到此資訊。

  • 叢集 UUID 是您 Amazon MSK 叢集 ARN 的一部分,您可以在 Amazon MSK 主控台中找到。請遵循 Amazon MSK 文件中列出叢集中的程序,以尋找此資訊。

  • 與您的 Amazon MSK 叢集相關聯的安全群組。您可以在 Amazon MSK 主控台中找到此資訊。在下列步驟中,請將這些步驟稱為您的叢集SecurityGroups

  • 包含您的 Amazon MSK 叢集的 Amazon VPC 識別碼。您可以在 Amazon MSK 主控台中識別與 Amazon MSK 叢集相關聯的子網路,然後在 Amazon VPC 主控台中識別與子網路相關聯的 Amazon VPC,以找到此資訊。

  • 解決方案中使用的 Kafka 主題名稱。您可以從 Kafka 管理主機使用 Kafka topics CLI 呼叫 Amazon MSK 叢集,以找到此資訊。如需 CLI 主題的詳細資訊,請參閱 Kafka 文件集中的新增和移除主題

  • 您卡夫卡主題的用戶群組名稱,適合由 Lambda 函數使用。這個群組可以由 Lambda 自動建立,因此您不需要使用 Kafka CLI 建立群組。如果您確實需要管理用戶群組,若要深入了解使用者群組 CLI,請參閱 Kafka 說明文件中的管理用戶群組

您 AWS 帳戶中的下列權限:

  • 建立和管理 Lambda 函數的權限。

  • 建立 IAM 政策並將其與 Lambda 函數建立關聯的權限。

  • 允許在託管您的 Amazon MSK 叢集的 Amazon VPC 中建立 Amazon VPC 端點和變更聯網組態。

設定 Lambda 的網路連線能力,以便與 Amazon MSK 通訊

用 AWS PrivateLink 於連接 Lambda 和 Amazon MSK。您可以透過在 Amazon VPC 主控台中建立界面 Amazon VPC 端點來執行此操作。如需網路組態的詳細資訊,請參閱網路組態

當 Amazon MSK 事件來源對應代表 Lambda 函數執行時,它會擔任 Lambda 函數的執行角色。此 IAM 角色授權對應存取 IAM 保護的資源,例如 Amazon MSK 叢集。雖然這些元件共用執行角色,但 Amazon MSK 對應和 Lambda 函數對其各自的任務有不同的連線需求,如下圖所示。

客戶 Amazon VPC 中的 Amazon MSK 叢集連接至 Lambda 服務程式碼,該程式碼會輪詢叢集,然後使用進行通訊。 AWS Lambda AWS STS

您的事件來源對應屬於您的 Amazon MSK 叢集安全群組。在此聯網步驟中,從 Amazon MSK 叢集 VPC 建立 Amazon VPC 端點,以將事件來源對應連接至 Lambda 和 STS 服務。保護這些端點以接受來自 Amazon MSK 叢集安全群組的流量。然後,調整 Amazon MSK 叢集安全群組,以允許事件來源對應與 Amazon MSK 叢集進行通訊。

您可以使用配置下列步驟 AWS Management Console。

設定介面 Amazon VPC 端點以連接 Lambda 和 Amazon MSK
  1. 為您的界面 Amazon VPC 端點 (端點) 建立安全群組,以允許來自叢集的 443 上輸入 TCP 流量。SecurityGroup SecurityGroups遵循 Amazon EC2 文件中建立安全群組中的程序來建立安全群組。然後,按照 Amazon EC2 文件中將規則新增至安全群組中的程序來新增適當的規則。

    使用下列資訊建立安全性群組:

    新增輸入規則時,請為集中的每個安全性群組建立規則SecurityGroups。對於每個規則:

    • 選取 HTTPS 做為「類型」。

    • 針對來源,選取其中一個叢集SecurityGroups

  2. 建立一個端點,將 Lambda 服務連接到包含您的 Amazon MSK 叢集的 Amazon VPC。遵循建立介面端點中的程序。

    使用下列資訊建立介面端點:

    • 對於服務名稱com.amazonaws.regionName.lambda,請選取區域名稱託管 Lambda 函數的位置。

    • 對於虛擬私人雲端,請選取包含您的 Amazon MSK 叢集的 Amazon VPC。

    • 針對「安全性群組」SecurityGroup,選取您先前建立的端點

    • 對於子網路,請選取託管 Amazon MSK 叢集的子網路。

    • 針對原則,請提供下列原則文件,以保護端點,以供 Lambda 服務主體使用lambda:InvokeFunction動作。

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 確保啟用 DNS 名稱保持設置狀態。

  3. 建立一個端點,將 AWS STS 服務連接到包含您的 Amazon MSK 叢集的 Amazon VPC。遵循建立介面端點中的程序。

    使用下列資訊建立介面端點:

    • 對於服務名稱,選取 AWS STS。

    • 對於虛擬私人雲端,請選取包含您的 Amazon MSK 叢集的 Amazon VPC。

    • 對於安全群組,選取端點SecurityGroup

    • 對於子網路,請選取託管 Amazon MSK 叢集的子網路。

    • 針對原則,請提供下列原則文件,以保護端點,以供 Lambda 服務主體使用sts:AssumeRole動作。

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 確保啟用 DNS 名稱保持設置狀態。

  4. 對於與 Amazon MSK 叢集相關聯的每個安全群組 (即叢中)SecurityGroups,允許以下事項:

    • 允許 9098 上的所有入站和出站 TCP 流量傳輸到所有叢集 SecurityGroups,包括在內部。

    • 允許 443 上的所有輸出 TCP 流量。

    預設安全性群組規則允許其中一些流量,因此,如果叢集附加至單一安全性群組,而該群組具有預設規則,則不需要額外的規則。若要調整安全群組規則,請遵循 Amazon EC2 文件中將規則新增至安全群組中的程序。

    使用下列資訊將規則新增至您的安全群組:

    • 針對連接埠 9098 的每個輸入規則或輸出規則,請提供

      • 針對 Type (類型),選擇 Custom TCP (自訂 TCP)

      • 對於連接埠範圍,請提供 9098。

      • 針對來源,提供其中一個叢集SecurityGroups

    • 對於連接埠 443 的每個輸入規則,對於類型,選取 HTTPS

為 Lambda 建立 IAM 角色,以便讀取您的 Amazon MSK 主題

確定 Lambda 要從您的 Amazon MSK 主題讀取的身份驗證要求,然後在政策中定義它們。建立角色 Lambda AuthRole,以授權 Lambda 使用這些權限。使用 kafka-cluster IAM 動作在您的 Amazon MSK 叢集上授權動作。然後,授權 Lambda 執行探索kafka並連接到 Amazon MSK 叢集所需的 Amazon MSK 和 Amazon EC2 動作,以及執行 CloudWatch 動作,以便 Lambda 可以記錄其完成的工作。

描述 Lambda 從 Amazon MSK 讀取的身份驗證要求
  1. 撰寫 IAM 政策文件 (JSON 文件) 叢集 AuthPolicy,允許 Lambda 使用您的卡夫卡客戶群組讀取 Amazon MSK 叢集中的卡夫卡主題。Lambda 需要在讀取時設定卡夫卡消費群組。

    變更下列範本以符合您的先決條件:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:region:account-id:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:region:account-id:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    欲了解更多信息,請諮詢IAM 角色型身分驗證。撰寫您的政策時:

    • 對於區域帳戶 ID,請提供託管 Amazon MSK 叢集的區域和帳戶識別碼。

    • 對於 msk ClusterName,請提供您的 Amazon MSK 叢集的名稱。

    • 對於叢集 UUID,請在 ARN 中為您的 Amazon MSK 叢集提供 UUID。

    • 對於 msk TopicName,請提供您的卡夫卡主題的名稱。

    • 若為 msk GroupName,請提供您的卡夫卡消費群組名稱。

  2. 識別 Lambda 探索和連接 Amazon MSK 叢集所需的 Amazon MSK、Amazon EC2 和 CloudWatch 許可,並記錄這些事件。

    AWSLambdaMSKExecutionRole管理的原則會寬鬆定義必要的權限。請按照以下步驟使用它。

    在生產環境中,請根據最低權限原則評估AWSLambdaMSKExecutionRole以限制執行角色原則,然後為您的角色撰寫取代此受管理原則的原則。

如需 IAM 政策語言的詳細資訊,請參閱 IAM 文件

現在您已經撰寫了政策文件,請建立 IAM 政策,以便將其附加到您的角色。您可以按照以下步驟使用控制台執行此操作。

從政策文件建立 IAM 政策
  1. 登入 AWS Management Console 並開啟身分與存取權管理主控台,網址為 https://console.aws.amazon.com/iam/

  2. 在左側的導覽窗格中,選擇 Policies (政策)

  3. 選擇 Create policy (建立政策)。

  4. 政策編輯器中,選擇 JSON 選項。

  5. 貼上叢集AuthPolicy

  6. 將許可新增至政策後,請選擇下一步

  7. 檢視與建立頁面上,為您正在建立的政策輸入政策名稱描述 (選用)。檢視此政策中定義的許可,來查看您的政策所授予的許可。

  8. 選擇 Create policy (建立政策) 儲存您的新政策。

如需詳細資訊,請參閱 IAM 文件中的建立 IAM 政策。

現在您已擁有適當的 IAM 政策,請建立角色並將其附加至該角色。您可以按照以下步驟使用控制台執行此操作。

若要在 IAM 主控台中建立執行角色
  1. 在 IAM 主控台中開啟 角色頁面

  2. 選擇 建立角色

  3. 受信任的實體類型下,選擇 AWS  服務

  4. 使用案例 下,選擇 Lambda

  5. 選擇下一步

  6. 選取以下政策:

    • 集群 AuthPolicy

    • AWSLambdaMSKExecutionRole

  7. 選擇下一步

  8. 角色名稱中,輸入 lambda,AuthRole然後選擇建立角色

如需詳細資訊,請參閱 使用執行角色定義 Lambda 函數許可

建立 Lambda 函數以讀取您的 Amazon MSK 主題

建立設定為使用 IAM 角色的 Lambda 函數。您可以使用主控台建立 Lambda 函數。

若要使用驗證組態建立 Lambda 函數
  1. 開啟 Lambda 主控台,然後從標頭中選取建立函數

  2. 選取從頭開始撰寫

  3. 對於函數名稱,請提供您選擇的適當名稱。

  4. 對於 Runtime,請選擇的最新支援版本,Node.js以使用本教學課程中提供的程式碼。

  5. 選擇 [變更預設執行角色]。

  6. 選取 [使用現有角色]。

  7. 對於現有角色,選取 lambda AuthRole

在生產環境中,您通常需要為 Lambda 函數的執行角色新增其他政策,以便有意義地處理 Amazon MSK 事件。如需將政策新增至角色的詳細資訊,請參閱 IAM 文件中的新增或移除身分許可。

建立對應至 Lambda 函數的事件來源

您的 Amazon MSK 事件來源對應提供 Lambda 服務所需的資訊,以便在發生適當的 Amazon MSK 事件時叫用 Lambda。您可以使用主控台建立 Amazon MSK 映射。建立 Lambda 觸發器,然後自動設定事件來源對應。

若要建立 Lambda 觸發器 (以及事件來源對應)
  1. 瀏覽至 Lambda 函數的概觀頁面。

  2. 在「功能概覽」區段中,選擇左下角的「新增觸發器」。

  3. [選取來源] 下拉式清單中,選取 Amazon MSK

  4. 請勿設定驗證

  5. 對於 MSK 叢集,請選取叢集的名稱。

  6. 對於 Batch 大小,輸入 1。此步驟使得此功能更容易測試,並且在生產中不是理想的價值。

  7. 對於「主題名稱」,請提供您的 Kafka 主題名稱。

  8. 對於消費者群組 ID,請提供您的卡夫卡用戶群組的 ID。

更新您的 Lambda 函數以讀取串流資料

Lambda 通過事件方法參數提供有關卡夫卡事件的信息。如需 Amazon MSK 事件的範例結構,請參閱 範例事件。瞭解如何解譯 Lambda 轉送的 Amazon MSK 事件之後,您可以變更 Lambda 函數程式碼以使用其提供的資訊。

將下列程式碼提供給您的 Lambda 函數,以記錄 Lambda Amazon MSK 事件的內容,以供測試之用:

Node.js
exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }

您可以使用主控台將函數程式碼提供給 Lambda。

若要更新 Lambda 函數程式碼
  1. 瀏覽至 Lambda 函數的概觀頁面。

  2. 選擇 程式碼 標籤。

  3. 在程式碼來源 IDE 中輸入提供的程式碼

  4. 在 [程式碼原始碼] 導覽列中,選擇 [部署]。

測試您的 Lambda 函數以確認其已連接到您的 Amazon MSK 主題

您現在可以透過檢查 CloudWatch 事件記錄來確認事件來源是否叫用 Lambda。

驗證是否正在叫用 Lambda 函數
  1. 使用您的卡夫卡管理主機使用 CLI 生成卡夫卡事件。kafka-console-producer如需詳細資訊,請參閱 Kafka 文件中的將一些事件寫入主題。針對先前步驟中定義的事件來源對應,傳送足夠的事件以填滿批次大小定義的批次,否則 Lambda 將等待更多資訊呼叫。

  2. 如果您的函數執行,Lambda 會寫入發生的事情 CloudWatch。在主控台中,導覽至 Lambda 函數的詳細資料頁面。

  3. 選取 Configuration (組態) 索引標籤。

  4. 在側邊欄中,選取監控和作業工具

  5. 識別記CloudWatch 錄組態下的記錄群組。記錄群組的開頭應為/aws/lambda。選擇日誌群組的連結。

  6. 在 CloudWatch 主控台中,檢查 Lambda 傳送至記錄串流的記錄事件記錄事件。識別是否存在包含來自 Kafka 事件之訊息的記錄事件,如下圖所示。如果有的話,您已經使用 Lambda 事件來源對應成功將 Lambda 函數連接到 Amazon MSK。

    日誌事件,其中 CloudWatch 包含與所提供代碼所提取的事件信息相對應的消息。