

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 搭配使用 Lambda 與 Amazon MSK
<a name="with-msk"></a>

[Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) 是一項全受管服務，可讓您建立並執行使用 Apache Kafka 處理串流資料的應用程式。Amazon MSK 簡化了 Kafka 叢集的設定、擴展與管理流程。Amazon MSK 也可讓您更輕鬆地為多個可用區域設定應用程式，並使用 AWS Identity and Access Management (IAM) 確保安全。

本章說明了如何將 Amazon MSK 叢集用作 Lambda 函式的事件來源。將 Amazon MSK 與 Lambda 整合的一般流程包含以下步驟：

1. **[叢集與網路設定](with-msk-cluster-network.md)**：首先，設定 [Amazon MSK 叢集](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html)。這包括設定允許 Lambda 存取叢集所需的正確聯網組態。

1. **[事件來源映射設定](with-msk-configure.md)**：接著，建立 Lambda 所需的[事件來源映射](invocation-eventsourcemapping.md)資源，將 Amazon MSK 叢集安全地連線至函式。

1. **[函式與許可設定](with-msk-permissions.md)**：最後，確保函式已正確設定，並且其[執行角色](lambda-intro-execution-role.md)具有必要許可。

**注意**  
您現在可以直接從 Lambda 或 Amazon MSK 主控台建立和管理 Amazon MSK 事件來源映射。兩個主控台都提供選項，可自動處理設定必要的 Lambda 執行角色許可，以便更簡化的組態程序。

如需如何設定 Lambda 與 Amazon MSK 叢集整合的範例，請參閱 AWS 《運算部落格》中的[教學課程：使用 Amazon MSK 事件來源映射來調用 Lambda 函數](services-msk-tutorial.md)[使用 Amazon MSK 做為 的事件來源 AWS Lambda](https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/)，以及[《Amazon MSK 實驗室》中的 Amazon MSK Lambda 整合](https://amazonmsk-labs.workshop.aws/en/msklambda.html)。

**Topics**
+ [範例事件](#msk-sample-event)
+ [為 Lambda 設定 Amazon MSK 叢集與 Amazon VPC 網路](with-msk-cluster-network.md)
+ [設定 Amazon MSK 事件來源映射的 Lambda 許可](with-msk-permissions.md)
+ [設定 Lambda 的 Amazon MSK 事件來源](with-msk-configure.md)
+ [教學課程：使用 Amazon MSK 事件來源映射來調用 Lambda 函數](services-msk-tutorial.md)

## 範例事件
<a name="msk-sample-event"></a>

Lambda 會在調用函數時，在事件參數中傳送訊息批次。事件酬載包含訊息陣列。陣列中的每個項目包含 Amazon MSK 主題和分割區識別符的詳細資訊，以及時間戳記和 base64 編碼的訊息。

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

# 為 Lambda 設定 Amazon MSK 叢集與 Amazon VPC 網路
<a name="with-msk-cluster-network"></a>

若要將 AWS Lambda 函數連線至 Amazon MSK 叢集，您需要正確設定叢集及其所在的 [Amazon Virtual Private Cloud (VPC)](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html)。本頁介紹了如何設定叢集與 VPC。如果叢集與 VPC 已正確設定，請參閱[設定 Lambda 的 Amazon MSK 事件來源](with-msk-configure.md)，設定事件來源映射。

**Topics**
+ [Lambda 與 MSK 整合的網路組態需求概觀](#msk-network-requirements)
+ [為 MSK 事件來源設定 NAT 閘道](#msk-nat-gateway)
+ [設定 MSK 事件來源的 AWS PrivateLink 端點](#msk-vpc-privatelink)

## Lambda 與 MSK 整合的網路組態需求概觀
<a name="msk-network-requirements"></a>

Lambda 與 MSK 整合所需的聯網組態取決於應用程式的網路架構。此整合涉及三種主要資源：Amazon MSK 叢集、Lambda 函式、Lambda 事件來源映射。這些資源各自位於不同的 VPC 內：
+ Amazon MSK 叢集通常位於您管理的 VPC 的私有子網路內。
+ 您的 Lambda 函數位於 Lambda AWS擁有的受管 VPC 中。
+ 您的 Lambda 事件來源映射位於 Lambda AWS擁有的另一個受管 VPC 中，與包含函數的 VPC 分開。

[事件來源映射](invocation-eventsourcemapping.md)是 MSK 叢集與 Lambda 函式之間的媒介資源。事件來源映射有兩個主要任務。首先，其會輪詢 MSK 叢集來取得新訊息。然後，其會使用這些訊息來調用 Lambda 函式。由於這三種資源位於不同的 VPC 內，輪詢與調用操作皆須跨 VPC 網路進行呼叫。

事件來源映射所需的網路組態取決於其是使用[佈建模式](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)還是隨需模式，如下圖所示：

![\[\]](http://docs.aws.amazon.com/zh_tw/lambda/latest/dg/images/MSK-esm-network-overview.png)


在這兩種模式下，Lambda 事件來源映射輪詢 MSK 叢集取得新訊息的方式一致。為了在事件來源映射與 MSK 叢集之間建立連線，Lambda 會在私有子網路內建立 [Hyperplane ENI](configuration-vpc.md#configuration-vpc-enis) (或在可用時重複使用現有介面)，以此建立安全連線。如圖所示，此 Hyperplane ENI 會使用 MSK 叢集的子網路和安全群組組態，而不是 Lambda 函式。

從叢集輪詢訊息之後，Lambda 調用函式的方式在每種模式下有所不同：
+ 在佈建模式下，Lambda 會自動處理事件來源映射 VPC 與函式 VPC 之間的連線。因此，無需任何其他聯網元件，即能成功調用函式。
+ 在隨需模式下，Lambda 事件來源映射會透過客戶自管 VPC 的路徑來調用函式。因此，您需要在 VPC 的公有子網路內設定 [NAT 閘道](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)，或者在 VPC 的私有子網路內設定 [AWS PrivateLink](https://docs.aws.amazon.com/vpc/latest/privatelink/what-is-privatelink.html) 端點，這些端點需提供對 Lambda、[AWS Security Token Service (STS)](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html) 的存取權，並可選擇性提供對 [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) 的存取權。正確設定其中一個選項，即可在 VPC 與 Lambda 受管執行時期 VPC 之間建立連線，這是調用函式的必要條件。

NAT 閘道允許私有子網路內的資源存取公有網際網路。使用此組態表示您的流量在叫用 Lambda 函數之前周遊網際網路。 AWS PrivateLink 端點可讓私有子網路安全地連線至 AWS 服務或其他私有 VPC 資源，而不會周遊公有網際網路。有關如何設定這些資源的詳細資訊，請參閱[為 MSK 事件來源設定 NAT 閘道](#msk-nat-gateway)或[設定 MSK 事件來源的 AWS PrivateLink 端點](#msk-vpc-privatelink)。

截至目前，我們皆假設 MSK 叢集位於 VPC 的私有子網路內，這也是常見情況。不過，即使 MSK 叢集位於 VPC 的公有子網路內，您仍需設定 AWS PrivateLink 端點，才能啟用安全連線。下表根據設定 MSK 叢集和 Lambda 事件來源映射的方式，摘要說明了聯網組態要求：


| MSK 叢集位置 (位於客戶自管 VPC 內) | Lambda 事件來源映射擴展模式 | 必要的聯網組態 | 
| --- | --- | --- | 
|  私有子網路  |  隨需模式  |  NAT 閘道 （位於 VPC 的公有子網路中） 或 AWS PrivateLink 端點 （位於 VPC 的私有子網路中），以啟用對 Lambda 的存取 AWS STS，以及選擇性的 Secrets Manager。  | 
|  公有子網路  |  隨需模式  |  AWS PrivateLink 端點 （在 VPC 的公有子網路中），以啟用對 Lambda 的存取 AWS STS，以及選用的 Secrets Manager。  | 
|  私有子網路  |  佈建模式  |  無  | 
|  公有子網路  |  佈建模式  |  無  | 

此外，與 MSK 叢集相關聯的安全群組必須允許正確連接埠的流量。確保您已設定下列安全群組規則：
+ **傳入規則**：允許預設代理程式連接埠上的所有流量。MSK 使用的連接埠取決於叢集上的身分驗證類型：`9098` 用於 IAM 身分驗證、`9096` 用於 SASL/SCRAM，`9094` 用於 TLS。或者，您可使用自我參照安全群組規則，允許來自同一安全群組內其他執行個體的存取。
+ **傳出規則** – 如果您的函數需要與其他 `443` AWS 服務通訊，允許外部目的地連接埠上的所有流量。或者，如果您不需要與其他 AWS 服務通訊，您可以使用自我參考安全群組規則來限制對代理程式的存取。
+ **Amazon VPC 端點傳入規則**：若您使用 Amazon VPC 端點，與該端點相關聯的安全群組必須允許來自叢集安全群組 `443` 連接埠的傳入流量。

## 為 MSK 事件來源設定 NAT 閘道
<a name="msk-nat-gateway"></a>

您可以設定 NAT 閘道，允許事件來源映射從叢集輪詢訊息，並透過 VPC 的路徑調用函式。僅在事件來源映射使用隨需模式，且叢集位於 VPC 的私有子網路內時才需要此操作。如果叢集位於 VPC 的公有子網路內，或者事件來源映射使用佈建模式，則無需設定 NAT 閘道。

[NAT 閘道](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)允許私有子網路內的資源存取公有網際網路。如需與 Lambda 建立私有連線，請改為參閱[設定 MSK 事件來源的 AWS PrivateLink 端點](#msk-vpc-privatelink)。

設定 NAT 閘道之後，必須設定正確的路由表。這將允許來自私有子網路的流量透過 NAT 閘道路由至公有網際網路。

![\[\]](http://docs.aws.amazon.com/zh_tw/lambda/latest/dg/images/MSK-NAT-Gateway.png)


下列步驟將引導您使用主控台設定 NAT 閘道。視需要針對每個可用區域 (AZ) 重複這些步驟。

**設定 NAT 閘道和正確路由 (主控台)**

1. 請遵循[建立 NAT 閘道](https://docs.aws.amazon.com/vpc/latest/userguide/nat-gateway-working-with.html)中的步驟，同時注意下列事項：
   + NAT 閘道應始終位於公有子網路內。建立具有[公有連線](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)的 NAT 閘道。
   + 如果 MSK 叢集跨多個可用區域複寫，請為每個可用區域建立一個 NAT 閘道。例如，在每個可用區域中，VPC 應該有一個包含叢集的私有子網路，以及一個包含 NAT 閘道的公有子網路。對於具有三個可用區域的設定，您將擁有三個私有子網路、三個公有子網路及三個 NAT 閘道。

1. 建立 NAT 閘道之後，開啟 [Amazon VPC 主控台](https://console.aws.amazon.com/vpc/)，然後在左側功能表中選擇**路由表**。

1. 選擇 **Create route table** (建立路由表)。

1. 將此路由表與包含 MSK 叢集的 VPC 建立關聯。或者，輸入路由表的名稱。

1. 選擇 **Create route table** (建立路由表)。

1. 選擇剛剛建立的路由表。

1. 在**子網路關聯**索引標籤中，選擇**編輯子網路關聯**。
   + 將此路由表與包含 MSK 叢集的私有子網路建立關聯。

1. 選擇 **Edit routes** (編輯路由)。

1. 選擇**新增路由**：

   1. 對於 **Destination (目的地)**，請選擇 `0.0.0.0/0`。

   1. 在**目標**欄位中，選擇 **NAT 閘道**。

   1. 在搜尋方塊中，選擇在步驟 1 中建立的 NAT 閘道。這應該是與包含 MSK 叢集的私有子網路 (您在步驟 6 中與此路由表相關聯的私有子網路) 位於同一可用區域中的 NAT 閘道。

1. 選擇**儲存變更**。

## 設定 MSK 事件來源的 AWS PrivateLink 端點
<a name="msk-vpc-privatelink"></a>

您可以將 AWS PrivateLink 端點設定為從叢集輪詢訊息，並透過 VPC 透過路徑叫用函數。這些端點應允許 MSK 叢集存取下列服務：
+ Lambda 服務
+ [AWS Security Token Service (STS)](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html)
+ (選用) [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) 服務。若叢集身分驗證所需的秘密儲存在 Secrets Manager 中，則此為必要項目。

僅在事件來源映射使用隨需模式時，才需要設定 PrivateLink 端點。如果事件來源映射使用佈建模式，Lambda 會代為建立必要連線。

PrivateLink 端點允許安全的私有存取 AWS 服務 AWS PrivateLink。或者，若要設定 NAT 閘道以讓 MSK 叢集存取公有網際網路，請參閱[為 MSK 事件來源設定 NAT 閘道](#msk-nat-gateway)。

設定 VPC 端點之後，MSK 叢集應可直接且透過私有方式存取 Lambda、STS，以及選擇性地存取 Secrets Manager。

![\[\]](http://docs.aws.amazon.com/zh_tw/lambda/latest/dg/images/MSK-PrivateLink-Endpoints.png)


下列步驟將引導您使用主控台設定 PrivateLink 端點。視需要針對每個端點 (Lambda、STS、Secrets Manager) 重複這些步驟。

**設定 VPC PrivateLink 端點 (主控台)**

1. 開啟 [Amazon VPC 主控台](https://console.aws.amazon.com/vpc/)，然後在左側功能表中選擇**端點**。

1. 選擇**建立端點**。

1. 或者，輸入端點的名稱。

1. 在**類型**欄位中，選擇 **AWS 服務**。

1. 在**服務**欄位中，開始輸入服務的名稱。例如，若要建立連線至 Lambda 的端點，請在搜尋方塊中輸入 `lambda`。

1. 在結果中，您應能看到目前區域的服務端點。例如，在美國東部 (維吉尼亞北部) 區域，您應該會看到 `com.amazonaws.us-east-2.lambda`。選取該服務。

1. 在**網路設定**欄位中，選取包含 MSK 叢集的 VPC。

1. 在**子網路**欄位中，選取 MSK 叢集所在的可用區域。
   + 對於每個可用區域，在**子網路 ID** 欄位中，選擇包含 MSK 叢集的私有子網路。

1. 在**安全群組**欄位中，選取與 MSK 叢集相關聯的安全群組。

1. 選擇**建立端點**。

根據預設，Amazon VPC 端點具有開放的 IAM 政策，允許廣泛存取資源。最佳實務是限制這些政策，以使用該端點執行所需的動作。例如，對於 Secrets Manager 端點，您可以修改其政策，使其僅允許函式的執行角色存取秘密。

**Example VPC 端點政策：Secrets Manager 端點**  

```
{
    "Statement": [
        {
            "Action": "secretsmanager:GetSecretValue",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws::iam::123456789012:role/my-role"
                ]
            },
            "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
        }
    ]
}
```

對於 AWS STS 和 Lambda 端點，您可以將呼叫委託人限制為 Lambda 服務委託人。但是，請確定在這些政策使用 `"Resource": "*"`。

**Example VPC 端點政策 – AWS STS 端點**  

```
{
    "Statement": [
        {
            "Action": "sts:AssumeRole",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

**Example VPC 端點政策：Lambda 端點**  

```
{
    "Statement": [
        {
            "Action": "lambda:InvokeFunction",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

# 設定 Amazon MSK 事件來源映射的 Lambda 許可
<a name="with-msk-permissions"></a>

若要存取 Amazon MSK 叢集，函式和事件來源映射需要執行各種 Amazon MSK API 動作的許可。將這些許可新增至函式的[執行角色](lambda-intro-execution-role.md)。如果使用者需要存取權，請將必要許可新增至使用者或角色的身分政策。

[AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) 受管政策包含 Amazon MSK Lambda 事件來源映射所需的最低許可。若要簡化許可程序，您可以：
+ 將 [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) 受管政策連接至您的執行角色。
+ 讓 Lambda 主控台為您產生許可。當您[在主控台中建立 Amazon MSK 事件來源映射](msk-esm-create.md#msk-console)時，Lambda 會評估您的執行角色，並在缺少任何許可時提醒您。選擇**產生許可**以自動更新您的執行角色。如果您手動建立或修改執行角色政策，或者政策連接到多個角色，則此功能不適用。請注意，使用[失敗時目的地](kafka-on-failure.md)或[AWS Glue 結構描述登錄](services-consume-kafka-events.md)檔等進階功能時，執行角色可能仍需要額外的許可。

**Topics**
+ [所需的許可](#msk-required-permissions)
+ [可選的許可。](#msk-optional-permissions)

## 所需的許可
<a name="msk-required-permissions"></a>

您的 Lambda 函數執行角色必須具有下列 Amazon MSK 事件來源映射的必要許可。這些許可包含在 [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) 受管政策中。

### CloudWatch Logs 許可
<a name="msk-basic-permissions"></a>

下列許可允許 Lambda 在 Amazon CloudWatch Logs 中建立和存放日誌。
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [日誌：PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

### MSK 叢集許可
<a name="msk-cluster-permissions"></a>

下列許可允許 Lambda 代表您存取您的 Amazon MSK 叢集：
+ [kafka:DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html)
+ [kafka:DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html)
+ [kafka:GetBootstrapBrokers](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-bootstrap-brokers.html)

我們建議您使用 [kafka：DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html)，而非 [kafka：DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html)。v2 許可適用於佈建和無伺服器 Amazon MSK 叢集。您只需要政策中的其中一個許可。

### VPC 許可
<a name="msk-vpc-permissions"></a>

下列許可允許 Lambda 在連線至 Amazon MSK 叢集時建立和管理網路介面：
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)

## 可選的許可。
<a name="msk-optional-permissions"></a>

 您的 Lambda 函數可能需要許可，才能：
+ 存取跨帳戶 Amazon MSK 叢集。對於跨帳戶事件來源映射，您需要執行角色中的 [kafka：DescribeVpcConnection](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connection-arn.html)。建立跨帳戶事件來源映射的 IAM 主體需要 [kafka：ListVpcConnections](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connections.html)。
+ 存取 SCRAM 秘密 (若使用 [SASL/SCRAM 身分驗證](msk-cluster-auth.md#msk-sasl-scram))。此舉可讓函式透過使用者名稱與密碼連線至 Kafka。
+ 描述 Secrets Manager 秘密 (若使用 SASL/SCRAM 或 [mTLS 身分驗證](msk-cluster-auth.md#msk-mtls))。此舉可讓函式擷取安全連線所需的憑證或認證。
+ 如果您的 AWS Secrets Manager 秘密是使用 AWS KMS 客戶受管金鑰加密，請存取您的 AWS KMS 客戶受管金鑰。
+ 存取結構描述登錄檔秘密 (若使用的結構描述登錄檔需進行身分驗證)：
  + 對於 AWS Glue 結構描述登錄檔：您的函數需求`glue:GetRegistry`和`glue:GetSchemaVersion`許可。這些許可能夠讓函式查詢並使用儲存在 AWS Glue中的訊息格式規則。
  + 對於使用 `BASIC_AUTH` 或 `CLIENT_CERTIFICATE_TLS_AUTH` 的 [Confluent 結構描述登錄檔](https://docs.confluent.io/platform/current/schema-registry/security/index.html)：函式需要對包含身分驗證憑證的秘密具有 `secretsmanager:GetSecretValue` 許可。這可讓函式擷取存取 Confluent 結構描述登錄檔所需的使用者名稱/密碼或憑證。
  + 對於私有 CA 憑證：函式需要對包含憑證的秘密具有 secretsmanager:GetSecretValue 許可。此舉可讓函式驗證使用自訂憑證的結構描述登錄檔身分。
+ 如果您使用 IAM 身分驗證進行事件來源映射，請從主題存取 Kafka 叢集取用者群組和輪詢訊息。

 這些對應至下列必要許可：
+ [kafka:ListScramSecrets](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html) – 允許列出用於 Kafka 身分驗證的 SCRAM 秘密
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html) – 允許從 Secrets Manager 擷取秘密
+ [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) - 允許使用 解密加密的資料 AWS KMS
+ [glue：GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html) - 允許存取 AWS Glue 結構描述登錄檔
+ [glue：GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html) - 啟用從結構描述登錄檔擷取特定 AWS Glue 結構描述版本
+ [kafka-cluster：Connect](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - 准許連線至叢集並進行驗證
+ [kafka-cluster：AlterGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - 准許加入叢集上的群組，相當於 Apache Kafka 的 READ GROUP ACL
+ [kafka-cluster：DescribeGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - 准許描述叢集上的群組，相當於 Apache Kafka 的 DESCRIBE GROUP ACL
+ [kafka-cluster：DescribeTopic](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - 准許描述叢集上的主題，相當於 Apache Kafka 的 DESCRIBE TOPIC ACL
+ [kafka-cluster：ReadData](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - 准許從叢集上的主題讀取資料，相當於 Apache Kafka 的 READ TOPIC ACL

 此外，若想將失敗調用的記錄傳送至失敗時目的地，根據目的地類型，您將需要下列許可：
+ 對於 Amazon SQS 目的地：[sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html) – 允許將訊息傳送至 Amazon SQS 佇列
+ 對於 Amazon SNS 目的地：[sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html) – 允許將訊息發布至 Amazon SNS 主題
+ 對於 Amazon S3 儲存貯體目的地：[s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) 與 [s3：ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBucket.html) – 允許在 Amazon S3 儲存貯體中寫入及列出物件

如需疑難排解身分驗證與授權錯誤的相關資訊，請參閱[對 Kafka 事件來源映射錯誤進行疑難排解](with-kafka-troubleshoot.md)。

# 設定 Lambda 的 Amazon MSK 事件來源
<a name="with-msk-configure"></a>

若要將 Amazon MSK 叢集用作 Lambda 函式的事件來源，需要建立將這兩項資源連線的[事件來源映射](invocation-eventsourcemapping.md)。本頁介紹了如何為 Amazon MSK 建立事件來源映射。

本頁假設您已正確設定 MSK 叢集及其所在的 [Amazon Virtual Private Cloud (VPC)](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html)。如需設定叢集或 VPC，請參閱[為 Lambda 設定 Amazon MSK 叢集與 Amazon VPC 網路](with-msk-cluster-network.md)。若要設定錯誤處理的重試行為，請參閱 [設定 Kafka 事件來源的錯誤處理控制項](kafka-retry-configurations.md)。

**Topics**
+ [使用 Amazon MSK 叢集作為事件來源](#msk-esm-overview)
+ [在 Lambda 中設定 Amazon MSK 叢集身分驗證方法](msk-cluster-auth.md)
+ [為 Amazon MSK 事件來源建立 Lambda 事件來源映射](msk-esm-create.md)
+ [在 Lambda 中建立跨帳戶事件來源映射](msk-cross-account.md)
+ [Lambda 中的所有 Amazon MSK 事件來源組態參數](msk-esm-parameters.md)

## 使用 Amazon MSK 叢集作為事件來源
<a name="msk-esm-overview"></a>

當您將 Apache Kafka 或 Amazon MSK 叢集新增為 Lambda 函數的觸發條件時，該叢集會用作[事件來源](invocation-eventsourcemapping.md)。

Lambda 會根據您指定的[起始位置](kafka-starting-positions.md)，從 Kafka 主題 (您在 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 請求中指定為 `Topics`) 讀取事件資料。處理成功後，您的 Kafka 主題將遞交給 Kafka 叢集。

Lambda 會依序讀取每個 Kafka 主題分割區的訊息。單一 Lambda 承載可以包含來自多個分割區的訊息。有更多記錄可用時，Lambda 會根據您在 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 請求中指定的 BatchSize 值，繼續以批次方式來處理記錄，直到函式追上主題的進度為止。

Lambda 處理每個批次後，會遞交該批次中訊息的偏移量。如果函數針對批次中的任何訊息傳回錯誤，Lambda 會重試整個批次的訊息，直至處理成功或訊息過期。您可以將所有重試嘗試失敗時的記錄傳送至失敗時的目的地，以便稍後處理。

**注意**  
雖然 Lambda 函數的逾時上限通常為 15 分鐘，但 Amazon MSK、自我管理的 Apache Kafka、Amazon DocumentDB 以及 Amazon MQ for ActiveMQ 和 Amazon MQ for RabbitMQ 的事件來源映射只支援 14 分鐘逾時限制上限的函數。

# 在 Lambda 中設定 Amazon MSK 叢集身分驗證方法
<a name="msk-cluster-auth"></a>

Lambda 需要許可才能存取 Amazon MSK 叢集、擷取記錄和執行其他任務。Amazon MSK 支援多種方法向 MSK 叢集進行身分驗證。

**Topics**
+ [未驗證的存取](#msk-unauthenticated)
+ [SASL/SCRAM 身分驗證](#msk-sasl-scram)
+ [交互 TLS 驗證](#msk-mtls)
+ [IAM 身分驗證](#msk-iam-auth)
+ [Lambda 選擇引導代理程式的方法](#msk-bootstrap-brokers)

## 未驗證的存取
<a name="msk-unauthenticated"></a>

如果沒有用戶端透過網際網路存取叢集，您可以使用未驗證存取。

## SASL/SCRAM 身分驗證
<a name="msk-sasl-scram"></a>

Lambda 支援 [Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM)](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password-tutorial.html) 身分驗證，並使用 SHA-512 雜湊函式和 Transport Layer Security (TLS) 加密。若要讓 Lambda 連線至叢集，請將身分驗證憑證 (使用者名稱與密碼) 儲存在 Secrets Manager 秘密中，並在設定事件來源映射時參考此秘密。

如需有關使用 Secrets Manager 的詳細資訊，請參閱 *Amazon Managed Streaming for Apache Kafka Developer Guide* 中的 [Sign-in credentials authentication with Secrets Manager](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html)。

**注意**  
Amazon MSK 不支援 SASL/PLAIN 身分驗證。

## 交互 TLS 驗證
<a name="msk-mtls"></a>

交互 TLS (mTLS) 可提供用戶端與伺服器之間的雙向身分驗證。用戶端會將憑證傳送至伺服器，供伺服器驗證用戶端。伺服器也會將憑證傳送至用戶端，供用戶端驗證伺服器。

針對 Amazon MSK 與 Lambda 的整合，MSK 叢集充當伺服器，而 Lambda 充當用戶端。
+ 若要讓 Lambda 驗證 MSK 叢集，可以將用戶端憑證設定為 Secrets Manager 中的秘密，並在事件來源映射組態中參考此憑證。客户憑證必須由伺服器信任存放區中的憑證授權機構 (CA) 簽署。
+ MSK 叢集也會將伺服器憑證傳送至 Lambda。伺服器憑證必須由 AWS 信任存放區中的憑證授權機構 (CA) 簽署。

Amazon MSK 不支援自行簽署的伺服器憑證，因為 Amazon MSK 中的所有代理程式都使用由 [Amazon Trust Services CA](https://www.amazontrust.com/repository/) (Lambda 預設信任此機構) 簽署的[公有憑證](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html)。

### 設定 mTLS 機密
<a name="mtls-auth-secret"></a>

CLIENT\$1CERTIFICATE\$1TLS\$1AUTH 機密必須有憑證欄位和私有金鑰欄位。若為加密的私有金鑰，機密需要私有金鑰密碼。憑證與私有金鑰均必須為 PEM 格式。

**注意**  
Lambda 支援 [PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (但不支援 PBES2) 私有金鑰加密演算法。

憑證欄位必須包含憑證清單，以用戶端憑證開頭，隨後則是任何中繼憑證，並以根憑證結尾。每個憑證均必須以新的一行開始，結構如下：

```
-----BEGIN CERTIFICATE-----  
        <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager 支援高達 65,536 個位元組的機密，此空間足以容納長憑證鏈。

私有金鑰必須為 [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) 格式，結構如下：

```
-----BEGIN PRIVATE KEY-----  
         <private key contents>
-----END PRIVATE KEY-----
```

對於已加密的私有金鑰，請使用下列結構：

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
          <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

下列範例顯示的是使用了已加密私有金鑰之 mTLS 身分驗證的機密內容。若為加密的私有金鑰，您可以在機密中包含私有金鑰密碼。

```
{
 "privateKeyPassword": "testpassword",
 "certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
 "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

如需有關適用於 Amazon MSK 的 mTLS 詳細資訊，以及如何產生用戶端憑證的說明，請參閱 *Amazon Managed Streaming for Apache Kafka Developer Guide* 中的 [Mutual TLS client authentication for Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)。

## IAM 身分驗證
<a name="msk-iam-auth"></a>

您可以使用 AWS Identity and Access Management (IAM) 驗證連線至 MSK 叢集之用戶端的身分。透過 IAM 身分驗證，Lambda 憑藉函式[執行角色](lambda-intro-execution-role.md)中的許可連線至叢集、擷取記錄，以及執行其他必要動作。如需包含必要許可的政策範例，請參閱 *Amazon Managed Streaming for Apache Kafka Developer Guide* 中的 [ Create authorization policies for the IAM role](https://docs.aws.amazon.com/msk/latest/developerguide/create-iam-access-control-policies.html)。

若 MSK 叢集上的 IAM 身分驗證為作用中，且您未提供秘密，則 Lambda 會自動預設使用 IAM 身分驗證。

如需有關 Amazon MSK 中 IAM 身分驗證的詳細資訊，請參閱 [IAM access control](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)。

## Lambda 選擇引導代理程式的方法
<a name="msk-bootstrap-brokers"></a>

Lambda 會依據叢集上可用的身分驗證方法，以及您是否提供了身分驗證密碼，以此選擇[引導代理程式](https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html)。若您提供了 MTL 或 SASL/SCRAM 的密碼，則 Lambda 會自動選擇該身分驗證方法。若您未提供密碼，Lambda 會選取叢集上作用中的安全強度最高的身分驗證方法。以下是 Lambda 選擇代理程式的優先順序，身分驗證安全強度依次遞減：
+ mTLS (已提供 mTLS 密碼）
+ SASL/SCRAM (已提供 SASL /SCROM 密碼)
+ SASL IAM (未提供任何密碼，且 IAM 身分驗證在作用中）
+ 未驗證的 TLS (未提供任何密碼，且 IAM 身分驗證未在作用中)
+ 純文字 (未提供任何密碼，且 IAM 身分驗證和未經身分驗證的 TLS 皆未在作用中)

**注意**  
若 Lambda 無法連線至最安全的代理程式類型，Lambda 便不會嘗試連線至其他 (安全強度較弱) 的代理程式類型。若您要讓 Lambda 選擇安全強度較弱較弱的代理程式類型，請停用叢集上所有安全強度較弱更高的身分驗證方法。

# 為 Amazon MSK 事件來源建立 Lambda 事件來源映射
<a name="msk-esm-create"></a>

若要建立事件來源映射，可以使用 Lambda 主控台、[AWS Command Line Interface (CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 或 [AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/)。

**注意**  
當您建立事件來源映射時，Lambda 會在包含 MSK 叢集的私有子網路內建立 [Hyperplane ENI](configuration-vpc.md#configuration-vpc-enis)，使 Lambda 能建立安全連線。此 Hyperplane ENI 會使用 MSK 叢集的子網路和安全群組組態，而不是 Lambda 函式。

下列主控台步驟會將 Amazon MSK 叢集新增為 Lambda 函式的觸發程序。在此之下，此操作會建立事件來源映射資源。

**將 Amazon MSK 觸發條件新增至 Lambda 函數 （主控台）**

1. 開啟 Lambda 主控台中的[函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇要新增 Amazon MSK 觸發程序的 Lambda 函式名稱。

1. 在**函數概觀**下，選擇**新增觸發條件**。

1. 在**觸發程序組態**欄位中，選擇 **MSK**。

1. 若要指定 Kafka 叢集詳細資訊，請執行下列動作：

   1. 對於 **MSK cluster (MSK 叢集)**，請選取您的叢集。

   1. 在**主題名稱**欄位中，輸入要從中取用訊息的 Kafka 主題名稱。

   1. (選用) 在**取用者群組 ID** 欄位中，輸入要加入的 Kafka 取用者群組 ID (如適用)。如需詳細資訊，請參閱[Lambda 中可自訂的取用者群組 ID](kafka-consumer-group-id.md)。

1. 在**叢集身分驗證**欄位中，進行必要的組態設定。如需有關叢集身分驗證的詳細資訊，請參閱[在 Lambda 中設定 Amazon MSK 叢集身分驗證方法](msk-cluster-auth.md)。
   + 如果希望 Lambda 在建立連線時對 MSK 叢集執行身分驗證，請開啟**使用身分驗證**。建議進行身分驗證。
   + 如果使用身分驗證，在**身分驗證方法**欄位中，選擇要使用的身分驗證方法。
   + 如果使用身分驗證，在 **Secrets Manager 金鑰**欄位中，選擇包含存取叢集所需身分驗證憑證的 Secrets Manager 金鑰。

1. 在**事件輪詢器組態**欄位中，進行必要的組態設定。
   + 選擇**啟用觸發程序**，在建立完成後立即啟用觸發程序。
   + 選擇是否要為事件來源映射**設定佈建模式**。如需詳細資訊，請參閱[Lambda 中的 Apache Kafka 事件輪詢器擴展模式](kafka-scaling-modes.md)。
     + 如果您設定佈建模式，請輸入**最小值事件輪詢器**的值、**最大值事件輪詢器**的值，以及 PollerGroupName 的選用值，以指定相同事件來源 VPC 內多個 ESMs 的分組。
   + 在**開始位置**欄位中，選擇希望 Lambda 開始從串流讀取的方式。如需詳細資訊，請參閱[Lambda 中的 Apache Kafka 輪詢與串流開始位置](kafka-starting-positions.md)。

1. 在**批次處理**欄位中，進行必要的組態設定。如需有關批次處理的詳細資訊，請參閱[批次處理行為](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

   1. 對於 **批次大小**，輸入單一批次中要擷取的訊息數量上限。

   1. 在**批次時段**欄位中，輸入 Lambda 調用函式之前收集記錄所耗費的秒數上限。

1. 在**篩選**欄位中，進行必要的組態設定。如需有關篩選的詳細資訊，請參閱 [從 Amazon MSK 和自我管理的 Apache Kafka 事件來源篩選事件](kafka-filtering.md)。
   + 在**篩選條件**欄位中，新增篩選條件定義，決定是否處理事件。

1. 在**失敗處理**欄位中，進行必要的組態設定。如需有關失敗處理的詳細資訊，請參閱[擷取 Amazon MSK 和自我管理的 Apache Kafka 事件來源的捨棄批次](kafka-on-failure.md)。
   + 在**失敗時目的地**欄位中，指定失敗時目的地的 ARN。

1. 在**標籤**欄位中，輸入要與此事件來源映射建立關聯的標籤。

1. 若要建立觸發條件，請選擇 **新增** 。

您也可以使用 CLI AWS 搭配 [ create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 命令來建立事件來源映射。下列範例建立了事件來源映射，可將 Lambda 函式 `my-msk-function` 映射至 `AWSKafkaTopic` 主題 (從 `LATEST` 訊息開始)。此命令還使用了 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 物件，可指示 Lambda 在連線至叢集時使用 [SASL/SCRAM](msk-cluster-auth.md#msk-sasl-scram) 身分驗證。

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

如果叢集使用 [mTLS 身分驗證](msk-cluster-auth.md#msk-mtls)，則需包含指定 `CLIENT_CERTIFICATE_TLS_AUTH` 和 Secrets Manager 金鑰 ARN 的 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 物件。如下列命令所示：

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

如果叢集使用 [IAM 身分驗證](msk-cluster-auth.md#msk-iam-auth)，則不需要 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 物件。如下列命令所示：

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

# 在 Lambda 中建立跨帳戶事件來源映射
<a name="msk-cross-account"></a>

您可以使用[多 VPC 私有連線](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html)，將 Lambda 函數連接到不同 AWS 帳戶中的佈建 MSK 叢集。多 VPC 連線使用 AWS PrivateLink，這會保留 AWS 網路中的所有流量。

**注意**  
您無法為無伺服器 MSK 叢集建立跨帳戶事件來源映射。

若要建立跨帳戶事件來源映射，您必須先[為 MSK 叢集設定多 VPC 連線](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-turn-on)。建立事件來源映射時，請使用受管理的 VPC 連線 ARN 而非叢集 ARN，如下列範例所示。根據 MSK 叢集使用的驗證類型，[CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 操作也會有所不同。

**Example — 為使用 IAM 身分驗證的叢集建立跨帳戶的事件來源映射**  
當叢集使用 [IAM 角色型身分驗證](msk-cluster-auth.md#msk-iam-auth)時，您不需要 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 物件。範例：  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

**Example — 為使用 SASL/SCRAM 身分驗證的叢集建立跨帳戶的事件來源映射**  
如果叢集使用 [SASL/SCRAM 身分驗證](msk-cluster-auth.md#msk-sasl-scram)，則您必須包含指定 `SASL_SCRAM_512_AUTH` 和 Secrets Manager 機密 ARN 的 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html)。  
透過 SASL/SCRAM 身分驗證，有兩種方式可將機密用於跨帳戶 Amazon MSK 事件來源映射：  
+ 在 Lambda 函數帳戶中建立機密，並將其與叢集機密同步。[建立一個輪換](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html)以使兩個機密保持同步。此選項允許您從函數帳戶控制機密。
+ 使用機密必須與 Amazon MSK 叢集相關聯。此機密必須允許 Lambda 函數帳戶的跨帳户存取權。如需詳細資訊，請參閱[不同帳戶中使用者 AWS Secrets Manager 機密的許可](https://docs.aws.amazon.com/secretsmanager/latest/userguide/auth-and-access_examples_cross.html)。

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

**Example — 為使用 mTLS 身分驗證的叢集建立跨帳戶的事件來源映射**  
如果叢集使用 [mTLS 身分驗證](msk-cluster-auth.md#msk-mtls)，則您必須包含指定 `CLIENT_CERTIFICATE_TLS_AUTH` 和 Secrets Manager 機密 ARN 的 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html)。機密可以儲存在叢集帳戶或 Lambda 函數帳戶中。  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

# Lambda 中的所有 Amazon MSK 事件來源組態參數
<a name="msk-esm-parameters"></a>

所有 Lambda 事件來源類型都會共用相同的 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 和 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API 操作。但是，僅有部分參數適用於 Amazon MSK，如下表所示。


| 參數 | 必要 | 預設 | 備註 | 
| --- | --- | --- | --- | 
|  AmazonManagedKafkaEventSourceConfig  |  N  |  包含預設為一個唯一值的 ConsumerGroupId 欄位。  |  只能在建立時進行設定  | 
|  BatchSize  |  否  |  100  |  上限：10,000  | 
|  DestinationConfig  |  N  |  N/A  |  [擷取 Amazon MSK 和自我管理的 Apache Kafka 事件來源的捨棄批次](kafka-on-failure.md)  | 
|  已啟用  |  N  |  True  |    | 
|  BisectBatchOnFunctionError  |  N  |  False  |  [設定 Kafka 事件來源的錯誤處理控制項](kafka-retry-configurations.md)  | 
|  FunctionResponseTypes  |  N  |  N/A  |  [設定 Kafka 事件來源的錯誤處理控制項](kafka-retry-configurations.md)  | 
|  MaximumRecordAgeInSeconds  |  N  |  -1 （無限）  |  [設定 Kafka 事件來源的錯誤處理控制項](kafka-retry-configurations.md)  | 
|  MaximumRetryAttempts  |  N  |  -1 （無限）  |  [設定 Kafka 事件來源的錯誤處理控制項](kafka-retry-configurations.md)  | 
|  EventSourceArn  |  Y  | N/A |  只能在建立時進行設定  | 
|  FilterCriteria  |  N  |  N/A  |  [控制 Lambda 將哪些事件傳送至您的函數](invocation-eventfiltering.md)  | 
|  FunctionName  |  是  |  N/A  |    | 
|  KMSKeyArn  |  N  |  N/A  |  [篩選條件加密](invocation-eventfiltering.md#filter-criteria-encryption)  | 
|  MaximumBatchingWindowInSeconds  |  N  |  500 毫秒  |  [批次處理行為](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  ProvisionedPollersConfig  |  N  |  `MinimumPollers`：若未指定，預設值為 1。 `MaximumPollers`：若未指定，預設值為 200。 `PollerGroupName`：不適用  |  [佈建模式](kafka-scaling-modes.md#kafka-provisioned-mode)  | 
|  SourceAccessConfigurations  |  N  |  沒有憑證  |  您事件來源的 SASL/SCRAM 或 CLIENT\$1CERTIFICATE\$1TLS\$1AUTH (MutualTLS) 身分驗證憑證。  | 
|  StartingPosition  |  Y  | N/A |  AT\$1TIMESTAMP、TRIM\$1HORIZON 或 LATEST 只能在建立時進行設定  | 
|  StartingPositionTimestamp  |  N  |  N/A  |  StartingPosition 設定為 AT\$1TIMESTAMP 時需要  | 
|  Tags (標籤)  |  N  |  N/A  |  [在事件來源映射上使用標籤](tags-esm.md)  | 
|  主題  |  Y  | N/A |  Kafka 主題名稱 只能在建立時進行設定  | 

**注意**  
當您指定 時`PollerGroupName`，相同 Amazon VPC 中的多個 ESMs 可以共用事件輪詢單元 (EPU) 容量。您可以使用此選項來最佳化 ESMs 的佈建模式成本。ESM 分組的需求：  
ESMs 必須位於相同的 Amazon VPC 內
每個輪詢器群組最多 100 ESMs 
群組中所有 ESMs 的彙總輪詢器上限不能超過 2000
您可以更新 `PollerGroupName`，將 ESM 移至不同的群組，或將 `PollerGroupName`設定為空字串 ("")，從群組中移除 ESM。

# 教學課程：使用 Amazon MSK 事件來源映射來調用 Lambda 函數
<a name="services-msk-tutorial"></a>

在本教學課程中，您將執行下列操作：
+ 在與現有 Amazon MSK 叢集相同的 AWS 帳戶中建立 Lambda 函數。
+ 為 Lambda 設定聯網和身分驗證，以與 Amazon MSK 通訊。
+ 設定 Lambda Amazon MSK 事件來源映射，當主題中出現事件時，該映射會執行您的 Lambda 函數。

完成這些步驟後，當事件傳送至 Amazon MSK 時，您將能夠設定 Lambda 函數，以使用您自己的自訂 Lambda 程式碼自動處理這些事件。

 **您可以使用此功能做些什麼？** 

**範例解決方案：使用 MSK 事件來源映射為您的客戶提供即時分數。**

請考慮以下案例：您的公司託管了一個 Web 應用程式，您的客戶可以在其中檢視即時事件的相關資訊，例如運動遊戲。來自遊戲的資訊更新會透過 Amazon MSK 上的一個 Kafka 主題提供給您的團隊。您想要設計一個使用 MSK 主題更新的解決方案，以便在您所開發的應用程式內為客戶提供即時事件的更新檢視。您已決定採用下列設計方法：您的用戶端應用程式將與 AWS中託管的無伺服器後端通訊。用戶端將使用 Amazon API Gateway WebSocket API，透過 WebSocket 工作階段連線。

在此解決方案中，您需要一個元件來讀取 MSK 事件、執行一些自訂邏輯來準備應用程式層的事件，然後將該資訊轉送至 API Gateway API。您可以使用 實作此元件 AWS Lambda，方法是在 Lambda 函數中提供自訂邏輯，然後使用 AWS Lambda Amazon MSK 事件來源映射呼叫它。

如需使用 Amazon API Gateway WebSocket API 實作解決方案的詳細資訊，請參閱 API Gateway 文件中的 [WebSocket API tutorials](https://docs.aws.amazon.com/apigateway/latest/developerguide/websocket-api-chat-app.html)。

## 先決條件
<a name="w2aad101c23c15c35c19"></a>

具有下列預先設定資源 AWS 的帳戶：

**為了滿足這些先決條件，我們建議您遵循 Amazon MSK 文件中的 [Getting started using Amazon MSK](https://docs.aws.amazon.com//msk/latest/developerguide/getting-started.html)。**
+ Amazon MSK 叢集。請參閱*開始使用 Amazon MSK*中的[建立 Amazon MSK 叢集](https://docs.aws.amazon.com//msk/latest/developerguide/create-cluster.html)。
+ 下列組態：
  + 確定叢集安全設定中**已啟用** **IAM 角色型身分驗證**。這會透過限制您的 Lambda 函數僅存取所需的 Amazon MSK 資源來提高您的安全性。在新的 Amazon MSK 叢集上預設會啟用此功能。
  + 確保您的叢集聯網設定中的**公有存取**已關閉。限制 Amazon MSK 叢集對網際網路的存取，可限制有多少中介機構處理您的資料，藉此提高您的安全性。在新的 Amazon MSK 叢集上預設會啟用此功能。
+ Amazon MSK 叢集中用於此解決方案的 Kafka 主題。請參閱*開始使用 Amazon MSK* 中的[建立主題](https://docs.aws.amazon.com//msk/latest/developerguide/create-topic.html)。
+ Kafka 管理員主機，設定為從 Kafka 叢集擷取資訊，並將 Kafka 事件傳送至您的主題進行測試，例如已安裝 Kafka 管理員 CLI 和 Amazon MSK IAM 程式庫的 Amazon EC2 執行個體。請參閱*開始使用 Amazon MSK* 中的[建立用戶端機器](https://docs.aws.amazon.com//msk/latest/developerguide/create-client-machine.html)。

設定這些資源後，請從 AWS 您的帳戶收集下列資訊，以確認您已準備好繼續。
+ Amazon MSK 叢集的名稱。您可以在 Amazon MSK 主控台中找到此資訊。
+ 叢集 UUID 是 Amazon MSK 叢集 ARN 的一部分，可以在 Amazon MSK 主控台中找到。請執行 Amazon MSK 文件中 [Listing clusters](https://docs.aws.amazon.com/msk/latest/developerguide/msk-list-clusters.html) 中的程序來尋找此資訊。
+ 與您的 Amazon MSK 叢集相關聯的安全群組。您可以在 Amazon MSK 主控台中找到此資訊。在下列步驟中，將這些安全群組稱為 *clusterSecurityGroups*。
+ 包含 Amazon MSK 叢集的 Amazon VPC 的 ID。您可以在 Amazon MSK 主控台中識別與 Amazon MSK 叢集相關聯的子網路，然後在 Amazon VPC 主控台中識別與子網路相關聯的 Amazon VPC，籍此找到此資訊。
+ 解決方案中使用的 Kafka 主題的名稱。您可以從 Kafka 管理員主機使用 Kafka `topics` CLI 呼叫 Amazon MSK 叢集，籍此找到此資訊。如需關於主題 CLI 的詳細資訊，請參閱 Kafka 文件中的 [Adding and removing topics](https://kafka.apache.org/documentation/#basic_ops_add_topic)。
+ Kafka 主題的取用者群組名稱，適合供 Lambda 函數使用。此群組可由 Lambda 自動建立，因此不需要使用 Kafka CLI 建立。如果您需要管理取用者群組，以進一步了解取用者群組 CLI 的詳細資訊，則請參閱 Kafka 文件中的 [Managing Consumer Groups](https://kafka.apache.org/documentation/#basic_ops_consumer_group)。

您 AWS 帳戶中的下列許可：
+ 建立和管理 Lambda 函數的許可。
+ 建立 IAM 政策並與您的 Lambda 函數建立關聯的許可。
+ 在託管 Amazon MSK 叢集的 Amazon VPC 中建立 Amazon VPC 端點和變更聯網組態的許可。

### 安裝 AWS Command Line Interface
<a name="install_aws_cli"></a>

如果您尚未安裝 AWS Command Line Interface，請依照[安裝或更新最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)中的步驟進行安裝。

本教學課程需使用命令列終端機或 Shell 來執行命令。在 Linux 和 macOS 中，使用您偏好的 Shell 和套件管理工具。

**注意**  
在 Windows 中，作業系統的內建終端不支援您常與 Lambda 搭配使用的某些 Bash CLI 命令 (例如 `zip`)。若要取得 Ubuntu 和 Bash 的 Windows 整合版本，請[安裝適用於 Linux 的 Windows 子系統](https://docs.microsoft.com/en-us/windows/wsl/install-win10)。

## 設定供 Lambda 與 Amazon MSK 通訊的網路連線
<a name="w2aad101c23c15c35c21"></a>

 使用 AWS PrivateLink 連接 Lambda 和 Amazon MSK。您可以在 Amazon VPC 主控台中建立介面 Amazon VPC 端點來執行此操作。如需關於聯網組態的詳細資訊，請參閱[為 Lambda 設定 Amazon MSK 叢集與 Amazon VPC 網路](with-msk-cluster-network.md)。

當 Amazon MSK 事件來源映射代表 Lambda 函數執行時，它會擔任 Lambda 函數的執行角色。此 IAM 角色會授權映射存取 IAM 保護的資源，例如您的 Amazon MSK 叢集。雖然元件共用執行角色，但 Amazon MSK 映射和您的 Lambda 函數對各自的任務有不同的連線需求，如下圖所示。

![\[\]](http://docs.aws.amazon.com/zh_tw/lambda/latest/dg/images/msk_tut_network.png)


事件來源映射屬於 Amazon MSK 叢集安全群組。在此聯網步驟中，從您的 Amazon MSK 叢集 VPC 建立 Amazon VPC 端點，將事件來源映射連線到 Lambda 和 STS 服務。保護這些端點，接受來自 Amazon MSK 叢集安全群組的流量。然後，調整 Amazon MSK 叢集安全群組，允許事件來源映射與 Amazon MSK 叢集通訊。

 您可以使用 AWS 管理主控台來設定下列步驟。

**若要設定介面 Amazon VPC 端點連線 Lambda 和 Amazon MSK**

1. 為您的介面 Amazon VPC 端點 *endpointSecurityGroup* 建立安全群組，允許 443 上來自 *clusterSecurityGroups* 的傳入 TCP 流量。請遵循 Amazon EC2 文件中的 [Create a security group](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#creating-security-group) 中的程序來建立安全群組。然後，遵循 Amazon EC2 文件中的 [Add rules to a security group](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) 中的程序，以新增適當的規則。

   **使用下列資訊建立一個安全群組：**

   在新增傳入規則時，為 *clusterSecurityGroups* 中的每個安全群組建立規則。對於每條規則：
   + 針對**類型**，選取 **HTTPS**。
   + 針對**來源**，選取其中一個 *clusterSecurityGroups*。

1.  建立一個端點，將 Lambda 服務連線至包含 Amazon MSK 叢集的 Amazon VPC。遵循 [Create an interface endpoint](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html) 中的程序。

   **使用下列資訊建立一個介面端點：**
   + 針對**服務名稱**，選取 `com.amazonaws.regionName.lambda`，其中 *regionName* 會託管您的 Lambda 函數。
   + 針對 **VPC**，選取包含 Amazon MSK 叢集的 Amazon VPC。
   + 針對**安全群組**，選取先前建立的 *endpointSecurityGroup*。
   + 針對**子網路**，選取託管您的 Amazon MSK 叢集的子網路。
   + 針對**政策**，提供下列政策文件，這會保護端點以供 Lambda 服務主體用於 `lambda:InvokeFunction` 動作。

     ```
     {
         "Statement": [
             {
                 "Action": "lambda:InvokeFunction",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + 確保**啟用 DNS 名稱**保持設定狀態。

1.  建立端點，將 AWS STS 服務連線至包含 Amazon MSK 叢集的 Amazon VPC。遵循 [Create an interface endpoint](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html) 中的程序。

   **使用下列資訊建立一個介面端點：**
   + 針對**服務名稱**，選取 AWS STS。
   + 針對 **VPC**，選取包含 Amazon MSK 叢集的 Amazon VPC。
   + 針對**安全群組**，選取 *endpointSecurityGroup*。
   + 針對**子網路**，選取託管您的 Amazon MSK 叢集的子網路。
   + 針對**政策**，提供下列政策文件，這會保護端點以供 Lambda 服務主體用於 `sts:AssumeRole` 動作。

     ```
     {
         "Statement": [
             {
                 "Action": "sts:AssumeRole",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + 確保**啟用 DNS 名稱**保持設定狀態。

1. 對於與您的 Amazon MSK 叢集相關聯的每個安全群組，亦即，在 *clusterSecurityGroups* 中，允許以下項目：
   + 允許 9098 上所有 *clusterSecurityGroups* 的傳入和傳出 TCP 流量，包括其本身以內的 TCP 流量。
   + 允許 443 上的所有傳出 TCP 流量。

   根據預設，安全群組規則允許其中一些流量，因此如果您的叢集連接到單一安全群組，且該群組具有預設規則，則不需要額外的規則。若要調整安全群組規則，請遵循 Amazon EC2 文件中的 [Add rules to a security group](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) 中的程序。

   **使用下列資訊將規則新增至您的安全群組：**
   + 針對連接埠 9098 的每個傳入或傳出規則，提供
     + 針對**類型**，選取**自訂 TCP**。
     + 對於**連接埠範圍**，提供 9098。
     + 針對**來源**，提供其中一個 *clusterSecurityGroups*。
   + 對於連接埠 443 的每條傳入規則，針對**類型**，選取 **HTTPS**。

## 為 Lambda 建立 IAM 角色，以從 Amazon MSK 主題讀取
<a name="w2aad101c23c15c35c23"></a>

識別 Lambda 從 Amazon MSK 主題讀取的身分驗證要求，然後在政策中定義這些要求。建立角色 *lambdaAuthRole*，授權 Lambda 使用這些許可。使用 `kafka-cluster` IAM 動作授權 Amazon MSK 叢集上的動作。然後，授權 Lambda 執行探索和連線至 Amazon MSK 叢集所需的 Amazon MSK `kafka` 和 Amazon EC2 動作，以及 CloudWatch 動作 (以便 Lambda 可以記錄其已完成的動作)。

**若要描述 Lambda 從 Amazon MSK 讀取的身分驗證要求**

1. 撰寫 IAM 政策文件 (JSON 文件) *clusterAuthPolicy*，允許 Lambda 使用您的 Kafka 取用者群組，從 Amazon MSK 叢集中的 Kafka 主題讀取。Lambda 要求在讀取時設定 Kafka 取用者群組。

   修改以下範本以符合您的先決條件：

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "kafka-cluster:Connect",
                   "kafka-cluster:DescribeGroup",
                   "kafka-cluster:AlterGroup",
                   "kafka-cluster:DescribeTopic",
                   "kafka-cluster:ReadData",
                   "kafka-cluster:DescribeClusterDynamicConfiguration"
               ],
               "Resource": [
                   "arn:aws:kafka:us-east-1:111122223333:cluster/mskClusterName/cluster-uuid",
                   "arn:aws:kafka:us-east-1:111122223333:topic/mskClusterName/cluster-uuid/mskTopicName",
                   "arn:aws:kafka:us-east-1:111122223333:group/mskClusterName/cluster-uuid/mskGroupName"
               ]
           }
       ]
   }
   ```

------

   如需詳細資訊，請參閱 [設定 Amazon MSK 事件來源映射的 Lambda 許可](with-msk-permissions.md)。在撰寫政策時：
   + 使用 Amazon MSK 叢集 AWS 帳戶 的 和 取代 *us-east-1* AWS 區域 和 *111122223333*。
   + 對於 *mskClusterName*，請提供 Amazon MSK 叢集的名稱。
   + 對於 *cluster-uuid*，請提供 Amazon MSK 叢集 ARN 中的 UUID。
   + 對於 *mskTopicName*，請提供 Kafka 主題的名稱。
   + 針對 *mskGroupName*，請提供 Kafka 取用者群組的名稱。

1. 識別 Lambda 探索和連線 Amazon MSK 叢集以及記錄這些事件所需的 Amazon MSK、Amazon EC2 和 CloudWatch 許可。

   `AWSLambdaMSKExecutionRole` 受管政策允許定義必要的許可。在下列步驟中使用該政策。

   在生產環境中，評估 `AWSLambdaMSKExecutionRole` 以根據最低權限原則限制執行角色政策，然後為您的角色撰寫取代此受管政策的政策。

如需有關 IAM 政策語言的詳細資訊，請參閱 [IAM 文件](https://docs.aws.amazon.com//iam/)。

現在您已撰寫政策文件，請建立 IAM 政策，以便將其連接至您的角色。您可以使用主控台執行下列程序，以完成此操作。

**若要從政策文件建立 IAM 政策**

1. 登入 AWS 管理主控台 ，並在 https：//[https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 在左側的導覽窗格中，選擇 **Policies (政策)**。

1. 選擇**建立政策**。

1. 在**政策編輯器**中，選擇 **JSON** 選項。

1. 貼上 *clusterAuthPolicy*。

1. 將許可新增至政策後，請選擇**下一步**。

1. 在**檢視與建立**頁面上，為您正在建立的政策輸入**政策名稱**與**描述** (選用)。檢視**此政策中定義的許可**，來查看您的政策所授予的許可。

1. 選擇 **Create policy** (建立政策) 儲存您的新政策。

如需詳細資訊，請參閱 IAM 文件中的 [Creating IAM policies](https://docs.aws.amazon.com//IAM/latest/UserGuide/access_policies_create.html)。

現在您擁有適當的 IAM 政策，請建立一個角色並將這些政策連接至該角色。您可以使用主控台執行下列程序，以完成此操作。

**若要在 IAM 主控台中建立執行角色**

1. 在 IAM 主控台中開啟 [角色頁面](https://console.aws.amazon.com/iam/home#/roles)。

1. 選擇建**立角色**。

1. 在**受信任的實體類型**下，選擇 **AWS  服務**。

1. 在 **使用案例** 下，選擇 **Lambda**。

1. 選擇**下一步**。

1. 選取以下政策：
   + *clusterAuthPolicy*
   + `AWSLambdaMSKExecutionRole`

1. 選擇**下一步**。

1. 針對**角色名稱**，輸入 *lambdaAuthRole*，然後選擇**建立角色**。

如需詳細資訊，請參閱[使用執行角色定義 Lambda 函數許可](lambda-intro-execution-role.md)。

## 建立 Lambda 函數以從您的 Amazon MSK 主題中讀取
<a name="w2aad101c23c15c35c25"></a>

建立設定為使用您的 IAM 角色的 Lambda 函數。您可以使用主控台建立 Lambda 函數。

**若要使用驗證組態建立 Lambda 函數**

1.  開啟 Lambda 主控台，然後從標頭選取**建立函數**。

1. 選取**從頭開始撰寫**。

1. 針對**函數名稱**，提供您選擇的適當名稱。

1. 針對**執行時期**，選擇**最新支援**的 `Node.js` 版本，以使用本教學課程中提供的程式碼。

1. 選擇**變更預設執行角色**。

1. 選取**使用現有角色**。

1. 針對**現有角色**，選取 *lambdaAuthRole*。

在生產環境中，您通常需要將更多政策新增至 Lambda 函數的執行角色，以有意義的方式處理您的 Amazon MSK 事件。如需將政策新增至角色的詳細資訊，請參閱 IAM 文件中的 [Add or remove identity permissions](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html#add-policies-console)。

## 建立 Lambda 函數的事件來源映射
<a name="w2aad101c23c15c35c27"></a>

您的 Amazon MSK 事件來源映射為 Lambda 服務提供了在發生適當的 Amazon MSK 事件時調用 Lambda 所需的資訊。您可以使用主控台建立 Amazon MSK 映射。建立 Lambda 觸發條件，然後會自動設定事件來源映射。

**若要建立 Lambda 觸發條件 (和事件來源映射)**

1. 導覽至 Lambda 函數的概觀頁面。

1. 在函數概觀區段中，選擇左下角的**新增觸發條件**。

1. 在**選取來源**下拉式清單中，選取 **Amazon MSK**。

1. 請勿設定**身分驗證**。

1. 針對 **MSK 叢集**，請選取您的叢集名稱。

1. 針對**批次大小**，請輸入 1。此步驟可讓此功能更易於測試，而且不是生產中的理想值。

1. 針對**主題名稱**，請提供 Kafka 主題名稱。

1. 針對**取用者群組 ID**，請提供 Kafka 取用者群組的 ID。

## 更新您的 Lambda 函數以讀取串流資料
<a name="w2aad101c23c15c35c29"></a>

 Lambda 透過事件方法參數提供關於 Kafka 事件的資訊。如需 Amazon MSK 事件的結構範例，請參閱[範例事件](with-msk.md#msk-sample-event)。了解如何解譯 Lambda 轉送的 Amazon MSK 事件之後，您可以變更 Lambda 函數程式碼，以使用它們提供的資訊。

 將下列程式碼提供給 Lambda 函數，以記錄 Lambda Amazon MSK 事件的內容供測試之用：

------
#### [ .NET ]

**適用於 .NET 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)儲存庫中設定和執行。
使用 .NET 搭配 Lambda 來取用 Amazon MSK 事件。  

```
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KafkaEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace MSKLambda;

public class Function
{
    
    
    /// <param name="input">The event for the Lambda function handler to process.</param>
    /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param>
    /// <returns></returns>
    public void FunctionHandler(KafkaEvent evnt, ILambdaContext context)
    {

        foreach (var record in evnt.Records)
        {
            Console.WriteLine("Key:" + record.Key); 
            foreach (var eventRecord in record.Value)
            {
                var valueBytes = eventRecord.Value.ToArray();    
                var valueText = Encoding.UTF8.GetString(valueBytes);
                
                Console.WriteLine("Message:" + valueText);
            }
        }
    }
    

}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)儲存庫中設定和執行。
使用 Go 搭配 Lambda 來取用 Amazon MSK 事件。  

```
package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.KafkaEvent) {
	for key, records := range event.Records {
		fmt.Println("Key:", key)

		for _, record := range records {
			fmt.Println("Record:", record)

			decodedValue, _ := base64.StdEncoding.DecodeString(record.Value)
			message := string(decodedValue)
			fmt.Println("Message:", message)
		}
	}
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)儲存庫中設定和執行。
使用 Java 搭配 Lambda 來取用 Amazon MSK 事件。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord;

import java.util.Base64;
import java.util.Map;

public class Example implements RequestHandler<KafkaEvent, Void> {

    @Override
    public Void handleRequest(KafkaEvent event, Context context) {
        for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) {
            String key = entry.getKey();
            System.out.println("Key: " + key);

            for (KafkaEventRecord record : entry.getValue()) {
                System.out.println("Record: " + record);

                byte[] value = Base64.getDecoder().decode(record.getValue());
                String message = new String(value);
                System.out.println("Message: " + message);
            }
        }

        return null;
    }
}
```

------
#### [ JavaScript ]

**適用於 JavaScript (v3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)儲存庫中設定和執行。
使用 JavaScript 搭配 Lambda 來取用 Amazon MSK 事件。  

```
exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}
```
搭配 Lambda 使用 TypeScript 來取用 Amazon MSK 事件。  

```
import { MSKEvent, Context } from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "msk-handler-sample",
});

export const handler = async (
  event: MSKEvent,
  context: Context
): Promise<void> => {
  for (const [topic, topicRecords] of Object.entries(event.records)) {
    logger.info(`Processing key: ${topic}`);

    // Process each record in the partition
    for (const record of topicRecords) {
      try {
        // Decode the message value from base64
        const decodedMessage = Buffer.from(record.value, 'base64').toString();

        logger.info({
          message: decodedMessage
        });
      }
      catch (error) {
        logger.error('Error processing event', { error });
        throw error;
      }
    };
  }
}
```

------
#### [ PHP ]

**適用於 PHP 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)儲存庫中設定和執行。
使用 PHP 搭配 Lambda 來取用 Amazon MSK 事件。  

```
<?php
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kafka\KafkaEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): void
    {
        $kafkaEvent = new KafkaEvent($event);
        $this->logger->info("Processing records");
        $records = $kafkaEvent->getRecords();

        foreach ($records as $record) {
            try {
                $key = $record->getKey();
                $this->logger->info("Key: $key");

                $values = $record->getValue();
                $this->logger->info(json_encode($values));

                foreach ($values as $value) {
                    $this->logger->info("Value: $value");
                }
                
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)儲存庫中設定和執行。
使用 Python 搭配 Lambda 來取用 Amazon MSK 事件。  

```
import base64

def lambda_handler(event, context):
    # Iterate through keys
    for key in event['records']:
        print('Key:', key)
        # Iterate through records
        for record in event['records'][key]:
            print('Record:', record)
            # Decode base64
            msg = base64.b64decode(record['value']).decode('utf-8')
            print('Message:', msg)
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 來取用 Amazon MSK 事件。  

```
require 'base64'

def lambda_handler(event:, context:)
  # Iterate through keys
  event['records'].each do |key, records|
    puts "Key: #{key}"

    # Iterate through records
    records.each do |record|
      puts "Record: #{record}"

      # Decode base64
      msg = Base64.decode64(record['value'])
      puts "Message: #{msg}"
    end
  end
end
```

------
#### [ Rust ]

**適用於 Rust 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)儲存庫中設定和執行。
使用 Rust 搭配 Lambda 取用 Amazon MSK 事件。  

```
use aws_lambda_events::event::kafka::KafkaEvent;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use base64::prelude::*;
use serde_json::{Value};
use tracing::{info};

/// Pre-Requisites:
/// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html
/// 2. Add packages tracing, tracing-subscriber, serde_json, base64
///
/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/

async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> {

    let payload = event.payload.records;

    for (_name, records) in payload.iter() {

        for record in records {

         let record_text = record.value.as_ref().ok_or("Value is None")?;
         info!("Record: {}", &record_text);

         // perform Base64 decoding
         let record_bytes = BASE64_STANDARD.decode(record_text)?;
         let message = std::str::from_utf8(&record_bytes)?;
         
         info!("Message: {}", message);
        }

    }

    Ok(().into())
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // required to enable CloudWatch error logging by the runtime
    tracing::init_default_subscriber();
    info!("Setup CW subscriber!");

    run(service_fn(function_handler)).await
}
```

------

您可以使用主控台將函數程式碼提供給 Lambda。

**若要使用主控台程式碼編輯器更新函數程式碼**

1. 開啟 Lambda 主控台的[函數頁面](https://console.aws.amazon.com/lambda/home#/functions)，然後選取您的函數。

1. 選取**程式碼**索引標籤。

1. 在**程式碼來源**窗格中，選取您的原始程式碼檔案，然後在整合式程式碼編輯器中加以編輯。

1. 在 **DEPLOY** 區段中，選擇**部署**以更新函數的程式碼：  
![\[\]](http://docs.aws.amazon.com/zh_tw/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## 測試您的 Lambda 函數，確認其已連線至您的 Amazon MSK 主題
<a name="w2aad101c23c15c35c31"></a>

您現在可以透過檢查 CloudWatch 事件日誌，驗證您的 Lambda 是否正在被事件來源調用。

**若要驗證您的 Lambda 函數是否正在被調用**

1. 使用您的 Kafka 管理員主機，透過 `kafka-console-producer` CLI 產生 Kafka 事件。如需詳細資訊，請參閱 Kafka 文件中的 [Write some events into the topic](https://kafka.apache.org/documentation/#quickstart_send)。傳送足夠的事件，以填滿由批次大小定義的批次，用於上一個步驟中定義的事件來源映射，否則 Lambda 會等待調用更多資訊。

1. 如果您的函數執行，Lambda 會將發生的情況寫入 CloudWatch。在主控台中，導覽至您的 Lambda 函數詳情頁面。

1. 選取 **Configuration** (組態) 索引標籤。

1. 從側邊列，選取**監控和操作工具**。

1. 在**記錄組態**下，識別 **CloudWatch 日誌群組**。日誌群組應以 `/aws/lambda` 開頭。選擇日誌群組連結。

1. 在 CloudWatch 主控台中，檢查**日誌事件**，了解 Lambda 已傳送至日誌串流的日誌事件。識別是否有日誌事件包含來自 Kafka 事件的訊息，如下圖所示。如果有，您已成功使用 Lambda 事件來源映射將 Lambda 函數連線至 Amazon MSK。  
![\[\]](http://docs.aws.amazon.com/zh_tw/lambda/latest/dg/images/msk_tut_log.png)