

# 结合 Amazon MSK 使用 Lambda
<a name="with-msk"></a>

[Amazon Managed Streaming for Apache Kafka（Amazon MSK）](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html)是一项完全托管式服务，可用于构建并运行使用 Apache Kafka 来处理流数据的应用程序。Amazon MSK 简化了 Kafka 集群的设置、扩展和管理。Amazon MSK 还可以更轻松地配置您的应用程序以适用于多个可用区和保证 AWS Identity and Access Management (IAM) 的安全性。

本章说明如何将 Amazon MSK 集群用作 Lambda 函数的事件源。将 Amazon MSK 与 Lambda 集成的一般过程包括以下步骤：

1. **[集群和网络设置](with-msk-cluster-network.md)**：首先，设置您的 [Amazon MSK 集群](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html)。这包括允许 Lambda 访问集群的正确联网配置。

1. **[事件源映射设置](with-msk-configure.md)**：然后，创建 Lambda 需要的[事件源映射](invocation-eventsourcemapping.md)资源，以将 Amazon MSK 集群安全地连接到您的函数。

1. **[函数和权限设置](with-msk-permissions.md)**：最后，请确保您的函数设置正确，并且在其[执行角色](lambda-intro-execution-role.md)中具有必要的权限。

**注意**  
现在，您可以直接使用 Lambda 或 Amazon MSK 控制台来创建和管理您的 Amazon MSK 事件源映射。这两个控制台都提供了自动处理必要 Lambda 执行角色权限的设置功能，从而能够实现更简便的配置流程。

有关如何设置与 Amazon MSK 集群的 Lambda 集成的示例，请参阅[教程：使用 Amazon MSK 事件源映射调用 Lambda 函数](services-msk-tutorial.md)、AWS Compute Blog 上的 [Using Amazon MSK as an event source for AWS Lambda](https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/) 和 Amazon MSK Labs 中的 [Amazon MSK Lambda Integration](https://amazonmsk-labs.workshop.aws/en/msklambda.html)。

**Topics**
+ [事件示例](#msk-sample-event)
+ [为 Lambda 配置 Amazon MSK 集群和 Amazon VPC 网络](with-msk-cluster-network.md)
+ [为 Amazon MSK 事件源映射配置 Lambda 权限](with-msk-permissions.md)
+ [为 Lambda 配置 Amazon MSK 事件源](with-msk-configure.md)
+ [教程：使用 Amazon MSK 事件源映射调用 Lambda 函数](services-msk-tutorial.md)

## 事件示例
<a name="msk-sample-event"></a>

Lambda 调用函数时会在事件参数中发送一批消息。事件负载包含一个消息数组。每个数组项目都包含 Amazon MSK 主题和分区标识符的详细信息，以及时间戳和 base64 编码的消息。

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

# 为 Lambda 配置 Amazon MSK 集群和 Amazon VPC 网络
<a name="with-msk-cluster-network"></a>

要将 AWS Lambda 函数连接到 Amazon MSK 集群，您需要正确配置集群及其所在的 [Amazon Virtual Private Cloud（VPC）](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html)。此页面介绍如何配置集群和 VPC。如果您的集群和 VPC 已正确配置，请参阅[为 Lambda 配置 Amazon MSK 事件源](with-msk-configure.md)配置事件源映射。

**Topics**
+ [Lambda 和 MSK 集成的网络配置要求概述](#msk-network-requirements)
+ [为 MSK 事件源配置 NAT 网关](#msk-nat-gateway)
+ [为 MSK 事件源配置 AWS PrivateLink 端点](#msk-vpc-privatelink)

## Lambda 和 MSK 集成的网络配置要求概述
<a name="msk-network-requirements"></a>

Lambda 和 MSK 集成所需的联网配置取决于应用程序的网络架构。此集成涉及三种主要资源：Amazon MSK 集群、Lambda 函数和 Lambda 事件源映射。每种资源都位于不同的 VPC 中：
+ 您的 Amazon MSK 集群通常位于您管理的 VPC 的私有子网中。
+ 您的 Lambda 函数位于 Lambda 拥有的 AWS 托管 VPC 中。
+ 您的 Lambda 事件源映射位于 Lambda 拥有的另一个 AWS 托管 VPC 中，与包含函数的 VPC 分开。

[事件源映射](invocation-eventsourcemapping.md)是 MSK 集群和 Lambda 函数之间的中间资源。事件源映射有两个主要任务。首先，它会轮询您的 MSK 集群以获取新消息。然后，它将使用这些消息调用 Lambda 函数。由于这三种资源位于不同的 VPC 中，因此轮询和调用操作都需要跨 VPC 网络调用。

事件源映射的网络配置要求取决于其使用的是[预置模式](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)还是按需模式，如下图所示：

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/MSK-esm-network-overview.png)


在这两种模式下，Lambda 事件源映射以相同的方式轮询您的 MSK 集群以获取新消息。为了在事件源映射和 MSK 集群之间建立连接，Lambda 会在您的私有子网中创建 [Hyperplane ENI](configuration-vpc.md#configuration-vpc-enis)（或重复使用现有的 Hyperplane ENI，如果可用）来建立安全连接。如图所示，此 Hyperplane ENI 使用 MSK 集群的子网和安全组配置，而不是 Lambda 函数。

轮询集群的消息后，Lambda 在每种模式下以不同的方式调用函数：
+ 在预置模式下，Lambda 会自动处理事件源映射 VPC 和函数 VPC 之间的连接。因此，您不需要任何其他联网组件即可成功调用函数。
+ 在按需模式下，您的 Lambda 事件源映射通过客户托管 VPC 的路径调用函数。因此，您需要在 VPC 的公有子网中配置 [NAT 网关](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)，或者在 VPC 的私有子网中配置 [AWS PrivateLink](https://docs.aws.amazon.com/vpc/latest/privatelink/what-is-privatelink.html) 端点，以提供对 Lambda、[AWS Security Token Service（STS）](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html)以及可选 [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) 的访问权限。正确配置其中任一选项都可以在您的 VPC 和 Lambda 托管的运行时 VPC 之间建立连接，这是调用您的函数所必需的。

NAT 网关允许私有子网中的资源访问公共互联网。使用此配置意味着您的流量在调用 Lambda 函数之前会遍历互联网。AWS PrivateLink 端点允许私有子网安全地连接到 AWS 服务或其他私有 VPC 资源，而无需通过公共互联网。有关如何配置这些资源的详细信息，请参阅[为 MSK 事件源配置 NAT 网关](#msk-nat-gateway)或[为 MSK 事件源配置 AWS PrivateLink 端点](#msk-vpc-privatelink)。

到目前为止，我们假设您的 MSK 集群位于 VPC 内的私有子网中，这种情况更常见。但是，即使您的 MSK 集群位于 VPC 内的公有子网中，您也必须配置 AWS PrivateLink 端点以启用安全连接。下表根据您配置 MSK 集群和 Lambda 事件源映射的方式总结了联网配置要求：


| MSK 集群位置（位于客户托管的 VPC 中） | Lambda 事件源映射扩展模式 | 所需的联网配置 | 
| --- | --- | --- | 
|  私有子网  |  按需模式  |  NAT 网关（位于 VPC 的公有子网中），或 AWS PrivateLink 端点（位于 VPC 的私有子网中），以启用对 Lambda、AWS STS 以及可选的 Secrets Manager 的访问。  | 
|  公有子网  |  按需模式  |  AWS PrivateLink 端点（位于 VPC 的公有子网中），以启用对 Lambda、AWS STS 以及可选的 Secrets Manager 的访问。  | 
|  私有子网  |  预调配模式  |  无  | 
|  公有子网  |  预调配模式  |  无  | 

此外，与您的 MSK 集群关联的安全组必须允许通过正确端口的流量。请确保已配置以下安全组规则：
+ **入站规则**：允许默认代理端口的所有流量。MSK 使用的端口取决于集群上的身份验证类型：`9098` 用于 IAM 身份验证、`9096` 用于 SASL/SCRAM 和 `9094` 用于 TLS。或者，您可以使用自引用安全组规则允许来自同一安全组内的实例进行访问。
+ **出站规则**：如果您的函数需要与其他 AWS 服务进行通信，则允许端口 `443` 上的所有流量向外部目标传输。或者，如果您不需要与其他 AWS 服务通信，则可以使用自引用的安全组规则来限制对代理的访问权限。
+ **Amazon VPC 端点入站规则**：如果您正在使用 Amazon VPC 端点，则与该端点关联的安全组必须允许来自集群安全组的端口 `443` 上的入站流量。

## 为 MSK 事件源配置 NAT 网关
<a name="msk-nat-gateway"></a>

您可以配置 NAT 网关，以允许事件源映射轮询来自集群的消息，并通过 VPC 的路径调用该函数。仅当您的事件源映射使用按需模式，并且您的集群位于 VPC 的私有子网中时，才需要这样做。如果您的集群位于 VPC 的公有子网中，或者您的事件源映射使用预置模式，则无需配置 NAT 网关。

[NAT 网关](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)允许私有子网中的资源访问公共互联网。如果您需要与 Lambda 的私有连接，请改为参阅[为 MSK 事件源配置 AWS PrivateLink 端点](#msk-vpc-privatelink)。

配置 NAT 网关后，必须配置相应的路由表。这允许来自私有子网的流量通过 NAT 网关路由到公共互联网。

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/MSK-NAT-Gateway.png)


以下步骤将指导您使用控制台配置 NAT 网关。根据需要对每个可用区（AZ）重复这些步骤。

**配置 NAT 网关和正确的路由（控制台）**

1. 按照[创建 NAT 网关](https://docs.aws.amazon.com/vpc/latest/userguide/nat-gateway-working-with.html)中的步骤进行操作，注意以下几点：
   + NAT 网关应始终位于公有子网中。创建具有[公有连接](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)的 NAT 网关。
   + 如果您的 MSK 集群跨多个可用区复制，则请为每个可用区创建一个 NAT 网关。例如，在每个可用区中，您的 VPC 应有一个包含集群的私有子网和一个包含 NAT 网关的公有子网。对于具有三个可用区的设置，您将拥有三个私有子网、三个公有子网和三个 NAT 网关。

1. 创建 NAT 网关后，打开 [Amazon VPC 控制台](https://console.aws.amazon.com/vpc/)并在左侧菜单中选择**路由表**。

1. 选择**创建路由表**。

1. 将此路由表与包含 MSK 集群的 VPC 关联。（可选）输入路由表的名称。

1. 选择**创建路由表**。

1. 选择刚刚创建的路由表。

1. 在**子网关联**选项卡下，选择**编辑子网关联**。
   + 将此路由表与包含 MSK 集群的私有子网关联。

1. 选择 **Edit routes (编辑路由)**。

1. 选择**添加路由**：

   1. 对于 **Destination (目标)**，选择 `0.0.0.0/0`。

   1. 对于**目标**，选择 **NAT 网关**。

   1. 在搜索框中，选择已在步骤 1 中创建的 NAT 网关。这应该是与包含您的 MSK 集群的私有子网（您在步骤 6 中与此路由表关联的私有子网）位于同一可用区的 NAT 网关。

1. 选择**保存更改**。

## 为 MSK 事件源配置 AWS PrivateLink 端点
<a name="msk-vpc-privatelink"></a>

您可以配置 AWS PrivateLink 端点来轮询来自集群的消息，并通过 VPC 的路径调用该函数。这些端点应允许 MSK 集群访问以下各项：
+ Lambda 服务
+ [AWS Security Token Service（STS）](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html)
+ （可选）[AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) 服务。如果集群身份验证所需的密钥存储在 Secrets Manager 中，则需要此项。

仅当事件源映射使用按需模式时，才需要配置 PrivateLink 端点。如果事件源映射使用预置模式，则 Lambda 会为您建立所需的连接。

PrivateLink 端点允许通过 AWS PrivateLink 安全、私密地访问 AWS 服务。或者，要配置 NAT 网关以允许您的 MSK 集群访问公共互联网，请参阅[为 MSK 事件源配置 NAT 网关](#msk-nat-gateway)。

配置 VPC 端点后，您的 MSK 集群应该可以直接和私有地访问 Lambda、STS 以及可选的 Secrets Manager。

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/MSK-PrivateLink-Endpoints.png)


以下步骤将指导您使用控制台配置 PrivateLink 端点。根据需要对每个端点（Lambda、STS、Secrets Manager）重复这些步骤。

**配置 VPC PrivateLink 端点（控制台）**

1. 打开 [Amazon VPC 控制台](https://console.aws.amazon.com/vpc/)，然后在左侧菜单中选择**端点**。

1. 选择**创建端点**。

1. （可选）输入端点的名称。

1. 对于**类型**，选择 **AWS 服务**。

1. 在**服务**下，开始键入服务的名称。例如，要创建连接到 Lambda 的端点，请在搜索框中键入 `lambda`。

1. 在结果中，您应该看到当前区域中的服务端点。例如，在美国东部（弗吉尼亚州北部）区域中，应该看到 `com.amazonaws.us-east-2.lambda`。选择此服务。

1. 在**网络设置**下，选择包含 MSK 集群的 VPC。

1. 在**子网**下，选择 MSK 集群所在的可用区。
   + 对于每个可用区，在**子网 ID** 下，选择包含 MSK 集群的私有子网。

1. 在**安全组**下，请选择与 MSK 集群关联的安全组。

1. 选择**创建端点**。

默认情况下，Amazon VPC 端点具有开放的 IAM 策略，允许对资源进行广泛访问。最佳实践是，将这些策略限制为使用该端点执行所需的操作。例如，对于 Secrets Manager 端点，您可以修改其策略，使其仅允许函数的执行角色访问密钥。

**Example VPC 端点策略 – Secrets Manager 端点**  

```
{
    "Statement": [
        {
            "Action": "secretsmanager:GetSecretValue",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws::iam::123456789012:role/my-role"
                ]
            },
            "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
        }
    ]
}
```

对于 AWS STS 和 Lambda 端点，您可以将调用主体限制为 Lambda 服务主体。但是，请务必在这些政策中使用 `"Resource": "*"`。

**Example VPC 端点策略 – AWS STS 端点**  

```
{
    "Statement": [
        {
            "Action": "sts:AssumeRole",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

**Example VPC 端点策略 – Lambda 端点**  

```
{
    "Statement": [
        {
            "Action": "lambda:InvokeFunction",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

# 为 Amazon MSK 事件源映射配置 Lambda 权限
<a name="with-msk-permissions"></a>

要访问 Amazon MSK 集群，您的函数和事件源映射需要具有执行各种 Amazon MSK API 操作的权限。为函数的[执行角色](lambda-intro-execution-role.md)添加这些权限。如果您的用户需要访问权限，请将所需权限添加到用户或角色的身份策略中。

[AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) 托管策略包含 Amazon MSK Lambda 事件源映射所需的最低权限。要简化权限流程，您可以执行以下操作：
+ 将 [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) 托管策略附加到您的执行角色。
+ 让 Lambda 控制台为您生成权限。当您在[控制台中创建 Amazon MSK 事件源映射](msk-esm-create.md#msk-console)时，Lambda 会评估您的执行角色，并在发现任何权限缺失的情况时向您发出提醒。选择**生成权限**以自动更新您的执行角色。如果您手动创建或修改了执行角色的策略，或者这些策略被附加到多个角色上，则此方法将不起作用。请注意，在使用高级功能（例如[失败时的目标](kafka-on-failure.md)或 [AWS Glue 架构注册表](services-consume-kafka-events.md)）时，您的执行角色可能仍需要额外的权限。

**Topics**
+ [所需的权限](#msk-required-permissions)
+ [可选权限](#msk-optional-permissions)

## 所需的权限
<a name="msk-required-permissions"></a>

您的 Lambda 函数执行角色必须具备以下所需的 Amazon MSK 事件源映射权限。这些权限包含在 [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) 托管策略中。

### CloudWatch Logs 权限
<a name="msk-basic-permissions"></a>

以下权限使 Lambda 能够在 Amazon CloudWatch Logs 中创建和存储日志。
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [logs:PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

### MSK 集群权限
<a name="msk-cluster-permissions"></a>

以下权限使 Lambda 能够代表您访问 Amazon MSK 集群：
+ [kafka:DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html)
+ [kafka:DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html)
+ [kafka:GetBootstrapBrokers](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-bootstrap-brokers.html)

我们建议使用 [kafka:DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html) 而不是 [kafka:DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html)。v2 权限适用于预置集群和无服务器 Amazon MSK 集群。您的策略中只需其中一项权限即可。

### VPC 权限
<a name="msk-vpc-permissions"></a>

以下权限使 Lambda 能够在连接到您的 Amazon MSK 集群时创建和管理网络接口：
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [ ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)

## 可选权限
<a name="msk-optional-permissions"></a>

 您的 Lambda 函数还可能需要权限来：
+ 访问跨账户的 Amazon MSK 集群。要进行跨账户事件源映射，您在执行角色中需要 [kafka:DescribeVpcConnection](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connection-arn.html)。创建跨账户事件源映射的 IAM 主体需要 [kafka:ListVpcConnections](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connections.html)。
+ 访问您的 SCRAM 密钥（如果使用 [SASL/SCRAM 身份验证](msk-cluster-auth.md#msk-sasl-scram)）。这样，您的函数就可以通过用户名和密码来连接到 Kafka。
+ 如果您使用 SASL/SCRAM 或 [mTLS](msk-cluster-auth.md#msk-mtls) 身份验证，则请描述您的 Secrets Manager 密钥。这样，您的函数就可以检索安全连接所需的凭证或证书。
+ 如果您的 AWS Secrets Manager 密钥使用 AWS KMS 客户托管密钥加密，请访问您的 AWS KMS 客户托管密钥。
+ 如果您使用的是带身份验证的架构注册表，请访问您的架构注册表密钥：
  + 对于 AWS Glue 架构注册表：您的函数需要 `glue:GetRegistry` 和 `glue:GetSchemaVersion` 权限。这样，您的函数就可以查找和使用存储在 AWS Glue 中的消息格式规则。
  + 对于带有 `BASIC_AUTH` 或 `CLIENT_CERTIFICATE_TLS_AUTH` 的 [Confluent 架构注册表](https://docs.confluent.io/platform/current/schema-registry/security/index.html)：您的函数需要访问密钥（包含身份验证凭证）的 `secretsmanager:GetSecretValue` 权限。这样，您的函数就可以检索访问 Confluent 架构注册表所需的用户名/密码或证书。
  + 对于私有 CA 证书：您的函数需要访问密钥（包含证书）的 secretsmanager:GetSecretValue 权限。这样，您的函数就可以验证使用自定义证书的架构注册表的身份。
+ 如果您对事件源映射使用 IAM 身份验证，则可以访问 Kafka 集群的使用者组，并从指定的主题中轮询消息。

 这些密钥对应以下所需权限：
+ [kafka:ListScramSecrets](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html) - 让您可以列出用于 Kafka 身份验证的 SCRAM 密钥
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html) - 让您可以从 Secrets Manager 中检索密钥
+ [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) - 让您可以使用 AWS KMS 对加密数据进行解密
+ [glue:GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html) - 让您可以访问 AWS Glue 架构注册表
+ [glue:GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html) - 让您可以从 AWS Glue 架构注册表中检索特定的架构版本
+ [kafka-cluster:Connect](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html)：授予连接和验证集群的权限
+ [kafka-cluster:AlterGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html)：授予加入集群上群组的权限，相当于 Apache Kafka 的 READ GROUP ACL
+ [kafka-cluster:DescribeGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html)：授予描述集群上的群组的权限，相当于 Apache Kafka 的 DESCRIBE GROUP ACL
+ [kafka-cluster:DescribeTopic](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html)：授予描述集群上的主题的权限，相当于 Apache Kafka 的 DESCRIBE TOPIC ACL
+ [kafka-cluster:ReadData](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html)：授予从集群上的主题中读取数据的权限，相当于 Apache Kafka 的 READ TOPIC ACL

 此外，如果您想将失败调用的记录发送到失败时的目标，则需要根据目标类型获得以下权限：
+ 对于 Amazon SQS 目标：[sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html) - 让您可以向 Amazon SQS 队列发送消息
+ 对于 Amazon SNS 目标：[sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html) - 让您可以向Amazon SNS 主题发布消息
+ 对于 Amazon S3 存储桶目标：[s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) 和 [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBucket.html) - 让您可以在 Amazon S3 存储桶中写入和列出对象

有关身份验证和授权错误的故障排除，请参阅[Kafka 事件源映射错误的故障排除](with-kafka-troubleshoot.md)

# 为 Lambda 配置 Amazon MSK 事件源
<a name="with-msk-configure"></a>

要使用 Amazon MSK 集群作为 Lambda 函数的事件源，您需要创建连接这两个资源的[事件源映射](invocation-eventsourcemapping.md)。此页面介绍如何为 Amazon MSK 创建事件源映射。

此页面假设您已经正确配置 MSK 集群及其所在的 [Amazon Virtual Private Cloud（VPC）](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html)。如果您需要设置集群或 VPC，请参阅[为 Lambda 配置 Amazon MSK 集群和 Amazon VPC 网络](with-msk-cluster-network.md)。要配置错误处理的重试行为，请参阅 [为 Kafka 事件源配置错误处理控件](kafka-retry-configurations.md)。

**Topics**
+ [将 Amazon MSK 集群用作事件源](#msk-esm-overview)
+ [在 Lambda 中配置 Amazon MSK 集群身份验证方法](msk-cluster-auth.md)
+ [为 Amazon MSK 事件源创建 Lambda 事件源映射](msk-esm-create.md)
+ [在 Lambda 中创建跨账户事件源映射](msk-cross-account.md)
+ [Lambda 中的所有 Amazon MSK 事件源配置参数](msk-esm-parameters.md)

## 将 Amazon MSK 集群用作事件源
<a name="msk-esm-overview"></a>

当您添加 Apache Kafka 或 Amazon MSK 集群作为 Lambda 函数的触发器时，该集群将用作[事件源](invocation-eventsourcemapping.md)。

Lambda 根据您指定的[起始位置](kafka-starting-positions.md)，从您在 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 请求中指定为 `Topics` 的 Kafka 主题读取事件数据。成功进行处理后，会将 Kafka 主题提交给 Kafka 集群。

Lambda 按顺序读取每个 Kafka 主题分区的消息。单个 Lambda 负载可以包含来自多个分区的消息。当有更多记录可用时，Lambda 根据您在 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 请求中指定的 BatchSize 值，继续对记录进行批处理，直到函数赶上主题的速度。

Lambda 处理各个批次后，会提交该批次中消息的偏移量。如果函数为批处理中的任何消息返回错误，Lambda 将重试整批消息，直到处理成功或消息过期为止。您可以将所有重试都失败的记录发送到失败时的目标，以供日后处理。

**注意**  
尽管 Lambda 函数的最大超时限制通常为 15 分钟，但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射，仅支持最大超时限制为 14 分钟的函数。

# 在 Lambda 中配置 Amazon MSK 集群身份验证方法
<a name="msk-cluster-auth"></a>

Lambda 需要访问 Amazon MSK 集群、检索记录和执行其他任务的权限。Amazon MSK 支持多种方法来使用 MSK 集群进行身份验证。

**Topics**
+ [未经身份验证的访问](#msk-unauthenticated)
+ [SASL/SCRAM 身份验证](#msk-sasl-scram)
+ [双向 TLS 身份验证](#msk-mtls)
+ [IAM 身份验证](#msk-iam-auth)
+ [Lambda 如何选择引导代理](#msk-bootstrap-brokers)

## 未经身份验证的访问
<a name="msk-unauthenticated"></a>

如果没有客户端会通过互联网访问集群，则可以使用未经身份验证访问。

## SASL/SCRAM 身份验证
<a name="msk-sasl-scram"></a>

Lambda 支持使用 SHA-512 哈希函数和传输层安全性协议（TLS）加密进行[简单身份验证和安全层/加盐质疑应答身份验证机制（SASL/SCRAM）](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password-tutorial.html)身份验证。为使 Lambda 连接到集群，请将身份验证凭证（用户名和密码）存储在 Secrets Manager 密钥中，并在配置事件源映射时引用此密钥。

有关使用 Secrets Manager 的更多信息，请参阅《Amazon Managed Streaming for Apache Kafka Developer Guide》**中的 [Sign-in credentials authentication with Secrets Manager](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html)。

**注意**  
Amazon MSK 不支持 SASL/PLAIN 身份验证。

## 双向 TLS 身份验证
<a name="msk-mtls"></a>

双向 TLS（mTLS）在客户端和服务器之间提供双向身份验证。客户端将证书发送给服务器，以便服务器验证客户端。服务器还将服务器证书发送给客户端，以便客户端验证服务器。

对于 Amazon MSK 与 Lambda 的集成，MSK 集群充当服务器，而 Lambda 充当客户端。
+ 要让 Lambda 验证 MSK 集群，您可以在 Secrets Manager 中将客户端证书配置为密钥，并在事件源映射配置中引用该证书。客户端证书必须由服务器信任存储中的证书颁发机构 (CA) 签名。
+ MSK 集群还会向 Lambda 发送服务器证书。服务器证书必须由 AWS 信任存储中的证书颁发机构（CA）签名。

Amazon MSK 不支持自签名服务器证书。Amazon MSK 中的所有代理都使用由 [Amazon Trust Services CA](https://www.amazontrust.com/repository/) 签名的[公有证书](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html)，默认情况下 Lambda 信任这些证书。

### 配置 mTLS 密钥
<a name="mtls-auth-secret"></a>

CLIENT\$1CERTIFICATE\$1TLS\$1AUTH 密钥需要证书字段和私有密钥字段。对于加密的私有密钥，密钥需要私有密钥密码。证书和私有密钥必须采用 PEM 格式。

**注意**  
Lambda 支持 [PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1)（而不是 PBES2）私有密钥加密算法。

证书字段必须包含证书列表，首先是客户端证书，然后是任何中间证书，最后是根证书。每个证书都必须按照以下结构在新行中启动：

```
-----BEGIN CERTIFICATE-----  
        <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager 支持最多包含 65536 字节的密钥，这为长证书链提供了充足的空间。

私有密钥必须采用 [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) 格式，并具有以下结构：

```
-----BEGIN PRIVATE KEY-----  
         <private key contents>
-----END PRIVATE KEY-----
```

对于加密的私有密钥，请使用以下结构：

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
          <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

以下示例显示使用加密私有密钥进行 mTLS 身份验证的密钥内容。对于加密的私有密钥，您可以在密钥中包含私有密钥密码。

```
{
 "privateKeyPassword": "testpassword",
 "certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
 "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

有关适用于 Amazon MSK 的 mTLS 的更多信息，请参阅《Amazon Managed Streaming for Apache Kafka Developer Guide》**中的 [Mutual TLS client authentication for Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)。

## IAM 身份验证
<a name="msk-iam-auth"></a>

您可以使用 AWS Identity and Access Management（IAM）来验证连接到 MSK 集群的客户端的身份。使用 IAM 身份验证时，Lambda 依靠函数[执行角色](lambda-intro-execution-role.md)中的权限连接到集群、检索记录以及执行其他所需操作。有关包含必要权限的策略示例，请参阅《Amazon Managed Streaming for Apache Kafka Developer Guide》**中的 [Create authorization policies for the IAM role](https://docs.aws.amazon.com/msk/latest/developerguide/create-iam-access-control-policies.html)。

如果 IAM 身份验证在您的 MSK 集群上处于活动状态，并且您没有提供密钥，则 Lambda 自动默认使用 IAM 身份验证。

有关 Amazon MSK 中的 IAM 身份验证的更多信息，请参阅 [IAM access control](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)。

## Lambda 如何选择引导代理
<a name="msk-bootstrap-brokers"></a>

Lambda 根据集群上可用的身份验证方法以及您是否提供用于身份验证的密钥来选择[引导代理](https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html)。如果您为 mTLS 或 SASL/SCRAM 提供了密钥，Lambda 将自动选择该身份验证方法。如果不提供密钥，Lambda 会选择在集群上处于活动状态的最强身份验证方法。下面是 Lambda 选择代理的优先级顺序，从最强到最弱的身份验证：
+ mTLS（为 mTLS 提供的密钥）
+ SASL/SCRAM（为 SASL/SCRAM 提供的密钥）
+ SASL IAM（未提供密钥，IAM 身份验证处于活动状态）
+ 未经身份验证的 TLS（未提供密钥，IAM 身份验证未处于活动状态）
+ 纯文本（未提供密钥，IAM 身份验证和未经身份验证的 TLS 均未处于活动状态）

**注意**  
如果 Lambda 无法连接到最安全的代理类型，则 Lambda 不会尝试连接到其他（较弱）代理类型。如果希望 Lambda 选择较弱的代理类型，请停用集群上所有更强的身份验证方法。

# 为 Amazon MSK 事件源创建 Lambda 事件源映射
<a name="msk-esm-create"></a>

要创建事件源映射，您可以使用 Lambda 控制台、[AWS Command Line Interface (CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 或 [AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/)。

**注意**  
创建事件源映射时，Lambda 会在包含 MSK 集群的私有子网中创建 [Hyperplane ENI](configuration-vpc.md#configuration-vpc-enis)，从而允许 Lambda 建立安全连接。此 Hyperplane ENI 使用 MSK 集群的子网和安全组配置，而不是 Lambda 函数。

以下控制台步骤添加 Amazon MSK 集群作为 Lambda 函数的触发器。这将在后台创建一个事件源映射资源。

**将 Amazon MSK 触发器添加到 Lambda 函数（控制台）**

1. 打开 Lambda 控制台的[函数页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您要为其添加 Amazon MSK 触发器的 Lambda 函数的名称。

1. 在 **Function overview**（函数概览）下，选择 **Add trigger**（添加触发器）。

1. 在**触发器配置**下，选择 **MSK**。

1. 要指定 Kafka 集群详细信息，请执行以下操作：

   1. 对于 **MSK cluster**（MSK 集群），选择您的集群。

   1. 对于**主题名称**，输入要从中使用消息的 Kafka 主题的名称。

   1. 对于**使用者组 ID**，输入要加入的 Kafka 使用者组的 ID（如适用）。有关更多信息，请参阅 [Lambda 中可自定义的使用者组 ID](kafka-consumer-group-id.md)。

1. 对于**集群身份验证**，请进行必要的配置。有关集群身份验证的更多信息，请参阅[在 Lambda 中配置 Amazon MSK 集群身份验证方法](msk-cluster-auth.md)。
   + 如果您希望 Lambda 在建立连接时对 MSK 集群执行身份验证，请开启**使用身份验证**。建议进行身份验证。
   + 如果使用身份验证，对于**身份验证方法**，选择要使用的身份验证方法。
   + 如果您使用身份验证，对于 **Secrets Manager 密钥**，选择包含访问集群所需身份验证凭证的 Secrets Manager 密钥。

1. 在**事件轮询器配置**下，进行必要的配置。
   + 选择**激活触发器**以在创建后立即启用该触发器。
   + 选择是否要为事件源映射**配置预置模式**。有关更多信息，请参阅 [Lambda 中的 Apache Kafka 事件轮询器扩展模式](kafka-scaling-modes.md)。
     + 如果您配置了预置模式，输入**最少事件轮询器**的值、**最多事件轮询器**的值以及 PollerGroupName 的可选值，以指定同一个事件源 VPC 中的多个 ESM 的分组。
   + 对于**起始位置**，选择您希望 Lambda 如何开始从流中读取。有关更多信息，请参阅 [Lambda 中 Apache Kafka 轮询和流的起始位置](kafka-starting-positions.md)。

1. 在**批处理**下，进行必要的配置。有关批处理的更多信息，请参阅[批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

   1. 对于 **Batch size**（批处理大小），输入要在单个批次中接收的最大消息数。

   1. 对于**批处理时段**，输入 Lambda 在调用函数之前收集记录所花费的最大秒数。

1. 在**筛选**下，进行必要的配置。有关筛选的更多信息，请参阅 [从 Amazon MSK 和自托管式 Apache Kafka 事件源中筛选事件](kafka-filtering.md)。
   + 在**筛选条件**中，添加筛选条件定义以确定是否处理事件。

1. 在**故障处理**下，进行必要的配置。有关故障处理的更多信息，请参阅[捕获 Amazon MSK 和自托管式 Apache Kafka 事件源的丢弃批次](kafka-on-failure.md)。
   + 对于**故障目标**，指定故障时目标的 ARN。

1. 对于**标签**，输入要与此事件源映射关联的标签。

1. 要创建触发器，请选择 **Add**（添加）。

您还可以使用 AWS CLI 中的 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 命令创建事件源映射。以下示例创建事件源映射，将 Lambda 函数 `my-msk-function` 映射到 `AWSKafkaTopic` 主题，从 `LATEST` 消息开始。此命令还使用 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 对象指示 Lambda 在连接到集群时使用 [SASL/SCRAM](msk-cluster-auth.md#msk-sasl-scram) 身份验证。

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

如果集群使用 [mTLS 身份验证](msk-cluster-auth.md#msk-mtls)，请包含指定 `CLIENT_CERTIFICATE_TLS_AUTH` 的 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 对象以及 Secrets Manager 密钥 ARN。如下面的命令所示：

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

当集群使用 [IAM 身份验证](msk-cluster-auth.md#msk-iam-auth)时，您不需要 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 对象。如下面的命令所示：

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

# 在 Lambda 中创建跨账户事件源映射
<a name="msk-cross-account"></a>

您可以使用[多 VPC 私有连接](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html)将 Lambda 函数连接到不同 AWS 账户 中的预置 MSK 集群。多 VPC 连接使用 AWS PrivateLink，可将所有流量保持在 AWS 网络内。

**注意**  
您无法为无服务器 MSK 集群创建跨账户事件源映射。

要创建跨账户事件源映射，必须先[为 MSK 集群配置多 VPC 连接](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-turn-on)。创建事件源映射时，请使用托管 VPC 连接 ARN 而非集群 ARN，如以下示例所示。[CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 操作也因 MSK 集群使用的身份验证类型而异。

**Example — 为使用 IAM 身份验证的集群创建跨账户事件源映射**  
当集群使用[基于 IAM 角色的身份验证](msk-cluster-auth.md#msk-iam-auth)时，您不需要 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 对象。示例：  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

**Example — 为使用 SASL/SCRAM 身份验证的集群创建跨账户事件源映射**  
如果集群使用 [SASL/SCRAM 身份验证](msk-cluster-auth.md#msk-sasl-scram)，则必须包含指定 `SASL_SCRAM_512_AUTH` 的 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 对象以及 Secrets Manager 密钥 ARN。  
有两种方法可以通过 SASL/SCRAM 身份验证将密钥用于跨账户 Amazon MSK 事件源映射：  
+ 在 Lambda 函数账户中创建密钥并将其与集群密钥同步。[创建轮换](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html)以使两个密钥保持同步。此选项允许您控制来自函数账户的密钥。
+ 使用与 MSK 集群关联的密钥。此密钥必须可用于跨账户存取 Lambda 函数账户。有关更多信息，请参阅[不同账户中用户对 AWS Secrets Manager 密钥的访问权限](https://docs.aws.amazon.com/secretsmanager/latest/userguide/auth-and-access_examples_cross.html)。

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

**Example — 为使用 mTLS 身份验证的集群创建跨账户事件源映射**  
如果集群使用 [mTLS 身份验证](msk-cluster-auth.md#msk-mtls)，则必须包含指定 `CLIENT_CERTIFICATE_TLS_AUTH` 的 [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) 对象以及 Secrets Manager 密钥 ARN。密钥可以存储在集群账户或 Lambda 函数账户中。  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

# Lambda 中的所有 Amazon MSK 事件源配置参数
<a name="msk-esm-parameters"></a>

所有 Lambda 事件源类型共享相同的 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 和 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API 操作。但是，只有部分参数适用于 Amazon MSK，如下表中所示。


| 参数 | 必需 | 默认值 | 备注 | 
| --- | --- | --- | --- | 
|  AmazonManagedKafkaEventSourceConfig  |  N  |  包含 ConsumerGroupId 字段，该字段默认为唯一值。  |  只能在 Create（创建）设置  | 
|  BatchSize  |  否  |  100  |  最大值：10000  | 
|  DestinationConfig  |  N  |  不适用  |  [捕获 Amazon MSK 和自托管式 Apache Kafka 事件源的丢弃批次](kafka-on-failure.md)  | 
|  启用  |  N  |  True  |    | 
|  BisectBatchOnFunctionError  |  N  |  False  |  [为 Kafka 事件源配置错误处理控件](kafka-retry-configurations.md)  | 
|  FunctionResponseTypes  |  N  |  不适用  |  [为 Kafka 事件源配置错误处理控件](kafka-retry-configurations.md)  | 
|  MaximumRecordAgeInSeconds  |  N  |  -1（不限次数）  |  [为 Kafka 事件源配置错误处理控件](kafka-retry-configurations.md)  | 
|  MaximumRetryAttempts  |  N  |  -1（不限次数）  |  [为 Kafka 事件源配置错误处理控件](kafka-retry-configurations.md)  | 
|  EventSourceArn  |  Y  | 不适用 |  只能在 Create（创建）设置  | 
|  FilterCriteria  |  N  |  不适用  |  [控制 Lambda 向您的函数发送的事件](invocation-eventfiltering.md)  | 
|  FunctionName  |  是  |  不适用  |    | 
|  KMSKeyArn  |  N  |  不适用  |  [筛选条件的加密](invocation-eventfiltering.md#filter-criteria-encryption)  | 
|  MaximumBatchingWindowInSeconds  |  N  |  500 毫秒  |  [批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  ProvisionedPollersConfig  |  N  |  `MinimumPollers`：如果未指定，则默认值为 1 `MaximumPollers`：如果未指定，则默认值为 200 `PollerGroupName`：不适用  |  [预置模式](kafka-scaling-modes.md#kafka-provisioned-mode)  | 
|  SourceAccessConfigurations  |  否  |  无凭证  |  事件源的 SASL/SCRAM 或 CLIENT\$1CERTIFICATE\$1TLS\$1AUTH (MutualTLS) 身份验证凭证  | 
|  StartingPosition  |  Y  | 不适用 |  AT\$1TIMESTAMP、TRIM\$1HORIZON 或 LATEST 只能在 Create（创建）设置  | 
|  StartingPositionTimestamp  |  N  |  不适用  |  当 StartingPosition 设置为 AT\$1TIMESTAMP 时，为必需项  | 
|  标签  |  N  |  不适用  |  [在事件源映射上使用标签](tags-esm.md)  | 
|  主题  |  Y  | 不适用 |  Kafka 主题名称 只能在 Create（创建）设置  | 

**注意**  
当您指定 `PollerGroupName` 时，同一 Amazon VPC 中的多个 ESM 可以共享事件轮询器单元（EPU）容量。您可以使用此选项来优化 ESM 的预置模式成本。ESM 分组的要求：  
ESM 必须位于同一 Amazon VPC 中
每个轮询器组最多 100 个 ESM
一个组中所有 ESM 的总轮询器数量上限不能超过 2000
您可以更新 `PollerGroupName` 以将 ESM 移动到其他组，也可以通过将 `PollerGroupName` 设置为空字符串（""）从某个组中移除 ESM。

# 教程：使用 Amazon MSK 事件源映射调用 Lambda 函数
<a name="services-msk-tutorial"></a>

在本教程中，您将执行以下操作：
+ 在与现有 Amazon MSK 集群相同的 AWS 账户中创建 Lambda 函数。
+ 为 Lambda 配置联网和身份验证，以便与 Amazon MSK 通信。
+ 设置 Lambda Amazon MSK 事件源映射，其在主题中出现事件时运行您的 Lambda 函数。

完成这些步骤后，当事件发送到 Amazon MSK 时，您将能够设置 Lambda 函数，以使用自己的自定义 Lambda 代码自动处理这些事件。

 **您可以用这项功能做什么？** 

**示例解决方案：使用 MSK 事件源映射向您的客户提供实时比分。**

请考虑以下应用场景：您的公司托管了一个 Web 应用程序，您的客户可以在其中查看有关直播活动（例如体育比赛）的信息。比赛的信息更新将通过 Amazon MSK 上的 Kafka 主题提供给您的团队。您想设计一个解决方案，使用来自 MSK 主题的更新在您开发的应用程序中向客户提供直播活动的更新视图。您已决定采用以下设计方法：您的客户端应用程序将与在 AWS 中托管的无服务器后端进行通信。客户端将使用 Amazon API Gateway WebSocket API 通过 websocket 会话进行连接。

在此解决方案中，您需要一个组件来读取 MSK 事件，执行一些自定义逻辑为应用程序层准备这些事件，然后将该信息转发到 API Gateway API。您可以使用 AWS Lambda 实现此组件，方法是在 Lambda 函数中提供自定义逻辑，然后使用 AWS Lambda Amazon MSK 事件源映射对其进行调用。

有关使用 Amazon API Gateway WebSocket API 实施解决方案的更多信息，请参阅 API Gateway 文档中的 [WebSocket API 教程](https://docs.aws.amazon.com/apigateway/latest/developerguide/websocket-api-chat-app.html)。

## 先决条件
<a name="w2aad101c23c15c35c19"></a>

具有以下预配置资源的 AWS 账户：

**要满足这些先决条件，建议按照 Amazon MSK 文档中的 [Getting started using Amazon MSK](https://docs.aws.amazon.com//msk/latest/developerguide/getting-started.html) 进行操作。**
+ Amazon MSK 集群。请参阅 *Getting started using Amazon MSK* 中的 [Create an Amazon MSK cluster](https://docs.aws.amazon.com//msk/latest/developerguide/create-cluster.html)。
+ 以下配置：
  + 确保在集群安全设置中**启用****基于 IAM 角色的身份验证**。这会将您的 Lambda 函数限制为仅访问所需的 Amazon MSK 资源，从而提高安全性。默认情况下会对新的 Amazon MSK 集群启用此设置。
  + 确保集群网络设置中的**公有访问**已关闭。通过限制处理数据的中介数量，限制 Amazon MSK 集群对互联网的访问，以提高您的安全性。默认情况下会对新的 Amazon MSK 集群启用此设置。
+ 您的 Amazon MSK 集群中用于此解决方案的 Kafka 主题。请参阅 *Getting started using Amazon MSK* 中的 [Create a topic](https://docs.aws.amazon.com//msk/latest/developerguide/create-topic.html)。
+ 设置 Kafka 管理主机以从您的 Kafka 集群检索信息并将 Kafka 事件发送到您的主题进行测试，例如安装了 Kafka 管理 CLI 和 Amazon MSK IAM 库的 Amazon EC2 实例。请参阅 *Getting started using Amazon MSK* 中的 [Create a client machine](https://docs.aws.amazon.com//msk/latest/developerguide/create-client-machine.html)。

设置完这些资源后，请从您的 AWS 账户中收集以下信息，以确认您已准备好继续。
+ Amazon MSK 集群的名称。您可以在 Amazon MSK 控制台中找到这些信息。
+ 集群 UUID，您的 Amazon MSK 集群 ARN 的一部分，您可以在 Amazon MSK 控制台中找到它。按照 Amazon MSK 文档中 [Listing clusters](https://docs.aws.amazon.com/msk/latest/developerguide/msk-list-clusters.html) 中的步骤查找此信息。
+ 与您的 Amazon MSK 集群关联的安全组。您可以在 Amazon MSK 控制台中找到这些信息。在以下步骤中，这些安全组称为 *clusterSecurityGroups*。
+ 包含 Amazon MSK 集群的 Amazon VPC 的 ID。您可以通过在 Amazon MSK 控制台中识别与您的 Amazon MSK 集群关联的子网，然后在 Amazon VPC 控制台中识别与该子网关联的 Amazon VPC 来找到此信息。
+ 解决方案中使用的 Kafka 主题名称。您可以通过从 Kafka 管理主机使用 Kafka `topics` CLI 调用您的 Amazon MSK 集群来找到此信息。有关主题 CLI 的更多信息，请参阅 Kafka 文档中的 [Adding and removing topics](https://kafka.apache.org/documentation/#basic_ops_add_topic)。
+ 您的 Kafka 主题使用者组的名称，适合您的 Lambda 函数使用。Lambda 可以自动创建该组，因此您无需使用 Kafka CLI 创建该组。如果您确实需要管理使用者组，了解有关使用者组 CLI 的更多信息，则请参阅 Kafka 文档中的 [Managing Consumer Groups](https://kafka.apache.org/documentation/#basic_ops_consumer_group)。

您 AWS 账户中有以下权限：
+ 创建和管理 Lambda 函数的权限。
+ 创建 IAM 策略并将其与您的 Lambda 函数关联的权限。
+ 在托管您的 Amazon MSK 集群的 Amazon VPC 中创建 Amazon VPC 端点和更改网络配置的权限。

### 安装 AWS Command Line Interface
<a name="install_aws_cli"></a>

如果您尚未安装 AWS Command Line Interface，请按照[安装或更新最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 中的步骤进行安装。

本教程需要命令行终端或 Shell 来运行命令。在 Linux 和 macOS 中，可使用您首选的 Shell 和程序包管理器。

**注意**  
在 Windows 中，操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令（例如 `zip`）。[安装 Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10)，获取 Ubuntu 和 Bash 与 Windows 集成的版本。

## 为 Lambda 配置网络连接以与 Amazon MSK 通信
<a name="w2aad101c23c15c35c21"></a>

 使用 AWS PrivateLink 连接 Lambda 和 Amazon MSK。您可以通过在 Amazon VPC 控制台中创建接口 Amazon VPC 端点来实现此目的。有关联网配置的更多信息，请参阅 [为 Lambda 配置 Amazon MSK 集群和 Amazon VPC 网络](with-msk-cluster-network.md)。

当 Amazon MSK 事件源映射代表 Lambda 函数运行时，它会担任 Lambda 函数的执行角色。此 IAM 角色授权映射访问受 IAM 保护的资源，例如您的 Amazon MSK 集群。尽管这些组件共享执行角色，但 Amazon MSK 映射和您的 Lambda 函数对各自的任务有不同的连接要求，如下图所示。

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/msk_tut_network.png)


您的事件源映射属于 Amazon MSK 集群安全组。在此联网步骤中，从您的 Amazon MSK 集群 VPC 创建 Amazon VPC 端点，将事件源映射连接到 Lambda 和 STS 服务。保护这些端点，以接受来自您的 Amazon MSK 集群安全组的流量。然后，调整 Amazon MSK 集群安全组，以允许事件源映射与 Amazon MSK 集群进行通信。

 您可以使用 AWS 管理控制台 配置以下步骤。

**配置接口 Amazon VPC 端点以连接 Lambda 和 Amazon MSK**

1. 为您的接口 Amazon VPC 端点创建一个安全组 *endpointSecurityGroup*，以允许来自 *clusterSecurityGroups* 端口 443 的入站 TCP 流量。按照 Amazon EC2 文档中[创建安全组](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#creating-security-group)的步骤创建安全组。然后，按照 Amazon EC2 文档中[向安全组添加规则](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule)的步骤添加相应的规则。

   **使用以下信息创建安全组：**

   添加入站规则时，为 *clusterSecurityGroups* 中的每个安全组创建一条规则。对于每条规则：
   + 对于**类型**，选择 **HTTPS**。
   + 对于**源**，选择其中一个 *clusterSecurityGroups*。

1.  创建一个端点，将 Lambda 服务连接到包含 Amazon MSK 集群的 Amazon VPC。按照[创建接口端点](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html)中的步骤进行操作。

   **使用以下信息创建接口端点：**
   + 对于**服务名称**，选择 `com.amazonaws.regionName.lambda`，其中 *regionName* 托管您的 Lambda 函数。
   + 对于 **VPC**，选择包含您的 Amazon MSK 集群的 Amazon VPC。
   + 对于**安全组**，选择您之前创建的 *endpointSecurityGroup*。
   + 对于**子网**，选择托管您的 Amazon MSK 集群的子网。
   + 对于**策略**，请提供以下策略文档，其保护端点以供 Lambda 服务主体用于执行 `lambda:InvokeFunction` 操作。

     ```
     {
         "Statement": [
             {
                 "Action": "lambda:InvokeFunction",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + 确保**启用 DNS 名称**保持设置状态。

1.  创建一个端点，将 AWS STS 服务连接到包含 Amazon MSK 集群的 Amazon VPC。按照[创建接口端点](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html)中的步骤进行操作。

   **使用以下信息创建接口端点：**
   + 对于**服务名称**，选择 AWS STS。
   + 对于 **VPC**，选择包含您的 Amazon MSK 集群的 Amazon VPC。
   + 对于**安全组**，选择 *endpointSecurityGroup*。
   + 对于**子网**，选择托管您的 Amazon MSK 集群的子网。
   + 对于**策略**，请提供以下策略文档，其保护端点以供 Lambda 服务主体用于执行 `sts:AssumeRole` 操作。

     ```
     {
         "Statement": [
             {
                 "Action": "sts:AssumeRole",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + 确保**启用 DNS 名称**保持设置状态。

1. 对于与您的 Amazon MSK 集群关联的每个安全组（即 *clusterSecurityGroups*），允许执行以下操作：
   + 允许端口 9098 上到所有 *clusterSecurityGroups*（包括其自身内部）的所有入站和出站 TCP 流量。
   + 允许端口 443 上的所有出站 TCP 流量。

   默认安全组规则允许部分流量，因此，如果您的集群连接到单个安全组，并且该组有默认规则，则不需要其他规则。要调整安全组规则，请按照 Amazon EC2 文档中[向安全组添加规则](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule)的步骤进行操作。

   **使用以下信息向您的安全组添加规则：**
   + 对于端口 9098 的每条入站规则或出站规则，请提供
     + 对于 **Type (类型)**，选择 **Custom TCP (自定义 TCP)**。
     + 对于**端口范围**，请提供 9098。
     + 对于**源**，提供其中一个 *clusterSecurityGroups*。
   + 对于端口 443 的每条入站规则的**类型**，选择 **HTTPS**。

## 为 Lambda 创建 IAM 角色，以从您的 Amazon MSK 主题中读取
<a name="w2aad101c23c15c35c23"></a>

确定 Lambda 从 Amazon MSK 主题读取的身份验证要求，然后在策略中进行定义。创建一个角色 *lambdaAuthRole*，授权 Lambda 使用这些权限。使用 `kafka-cluster` IAM 操作在您的 Amazon MSK 集群上授权操作。然后，授权 Lambda 执行发现和连接到您的 Amazon MSK 集群所需的 Amazon MSK `kafka` 和 Amazon EC2 操作以及 CloudWatch 操作，这样 Lambda 便可记录其所执行的操作。

**描述 Lambda 从 Amazon MSK 读取的身份验证要求**

1. 编写一个 IAM 策略文档（JSON 文档）*clusterAuthPolicy*，允许 Lambda 使用您的 Kafka 使用者组从 Amazon MSK 集群中的 Kafka 主题进行读取。Lambda 要求在读取时设置一个 Kafka 使用者组。

   修改以下模板以符合您的先决条件：

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "kafka-cluster:Connect",
                   "kafka-cluster:DescribeGroup",
                   "kafka-cluster:AlterGroup",
                   "kafka-cluster:DescribeTopic",
                   "kafka-cluster:ReadData",
                   "kafka-cluster:DescribeClusterDynamicConfiguration"
               ],
               "Resource": [
                   "arn:aws:kafka:us-east-1:111122223333:cluster/mskClusterName/cluster-uuid",
                   "arn:aws:kafka:us-east-1:111122223333:topic/mskClusterName/cluster-uuid/mskTopicName",
                   "arn:aws:kafka:us-east-1:111122223333:group/mskClusterName/cluster-uuid/mskGroupName"
               ]
           }
       ]
   }
   ```

------

   有关更多信息，请参阅 [为 Amazon MSK 事件源映射配置 Lambda 权限](with-msk-permissions.md)。编写策略时：
   + 将 *us-east-1* 和 *111122223333* 替换为 Amazon MSK 集群的 AWS 区域 和 AWS 账户。
   + 对于 *mskClusterName*，提供您的 Amazon MSK 集群的名称。
   + 对于 *cluster-uuid*，提供您的 Amazon MSK 集群的 ARN 中 UUID。
   + 对于 *mskTopicName*，提供您的 Kafka 主题的名称。
   + 对于 *mskGroupName*，提供您的 Kafka 使用者组的名称。

1. 确定 Lambda 发现和连接您的 Amazon MSK 集群所需的 Amazon MSK、Amazon EC2 和 CloudWatch 权限，并记录这些事件。

   `AWSLambdaMSKExecutionRole` 托管策略宽松地定义所需的权限。在以下步骤中使用该策略。

   在生产环境中，评测 `AWSLambdaMSKExecutionRole` 以根据最低权限原则限制您的执行角色策略，然后为您的角色编写一个策略来取代此托管策略。

有关 IAM 策略语言的详细信息，请参阅 [IAM 文档](https://docs.aws.amazon.com//iam/)。

现在，您已经编写了策略文档，请创建一个 IAM 策略，这样便可将其附加到您的角色中。您可以使用控制台按以下步骤完成此操作。

**从策略文档创建 IAM 策略**

1. 登录 AWS 管理控制台，然后通过以下网址打开 IAM 控制台：[https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)。

1. 在左侧的导航窗格中，选择**策略**。

1. 选择**创建策略**。

1. 在**策略编辑器**部分，选择 **JSON** 选项。

1. 粘贴 *clusterAuthPolicy*。

1. 向策略添加完权限后，选择**下一步**。

1. 在**查看和创建**页面上，为创建的策略键入**策略名称**和**描述**（可选）。查看**此策略中定义的权限**以查看策略授予的权限。

1. 选择**创建策略**可保存新策略。

有关更多信息，请参阅 IAM 文档中的[创建 IAM 策略](https://docs.aws.amazon.com//IAM/latest/UserGuide/access_policies_create.html)。

既然您已经有了适当的 IAM 策略，请创建一个角色并将其附加到该角色。您可以使用控制台按以下步骤完成此操作。

**在 IAM 控制台中创建执行角色**

1. 在 IAM 控制台中，打开 [Roles](https://console.aws.amazon.com/iam/home#/roles)（角色）页面。

1. 请选择 **Create role**（创建角色）。

1. 在**可信实体类型**下，选择 **AWS 服务**。

1. 在 **Use case（使用案例）**下，选择 **Lambda**。

1. 选择**下一步**。

1. 选择以下策略：
   + *clusterAuthPolicy*
   + `AWSLambdaMSKExecutionRole`

1. 选择**下一步**。

1. 对于**角色名称**，输入 *lambdaAuthRole*，然后选择**创建角色**。

有关更多信息，请参阅 [使用执行角色定义 Lambda 函数权限](lambda-intro-execution-role.md)。

## 创建 Lambda 函数以从您的 Amazon MSK 主题中读取
<a name="w2aad101c23c15c35c25"></a>

创建配置为使用您的 IAM 角色的 Lambda 函数。您可以使用控制台创建您的 Lambda 函数。

**使用您的身份验证配置创建 Lambda 函数**

1.  打开 Lambda 控制台并从标题中选择**创建函数**。

1. 选择**从头开始编写**。

1. 对于**函数名称**，请提供您选择的相应名称。

1. 对于**运行时**，选择**最新支持**版本的 `Node.js` 以使用本教程中提供的代码。

1. 选择**更改默认执行角色**。

1. 选择**使用现有角色**。

1. 对于**现有角色**，选择 *lambdaAuthRole*。

在生产环境中，您通常需要向 Lambda 函数的执行角色添加更多策略，以便有效地处理您的 Amazon MSK 事件。有关向角色添加策略的更多信息，请参阅 IAM 文档中的[添加或删除身份权限](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html#add-policies-console)。

## 创建到 Lambda 函数的事件源映射
<a name="w2aad101c23c15c35c27"></a>

您的 Amazon MSK 事件源映射为 Lambda 服务提供了必要的信息，以在发生相应 Amazon MSK 事件时调用您的 Lambda。您可以使用控制台创建 Amazon MSK 映射。创建 Lambda 触发器，然后事件源映射会自动设置。

**创建 Lambda 触发器（和事件源映射）**

1. 导航到您的 Lambda 函数的概述页面。

1. 在函数概述部分中，选择左下角的**添加触发器**。

1. 在**选择源**下拉列表中，选择 **Amazon MSK**。

1. 请勿设置**身份验证**。

1. 对于 **MSK 集群**，选择集群的名称。

1. 对于**批次大小**，输入 1。此步骤使该功能更易于测试，但并非生产中的理想值。

1. 对于**主题名称**，输入 Kafka 主题名称。

1. 对于**使用者组 ID**，请提供您的 Kafka 使用者组的 ID。

## 更新您的 Lambda 函数以读取流数据
<a name="w2aad101c23c15c35c29"></a>

 Lambda 通过事件方法参数提供有关 Kafka 事件的信息。有关 Amazon MSK 事件的示例结构，请参阅 [事件示例](with-msk.md#msk-sample-event)。在您了解如何解读 Lambda 转发的 Amazon MSK 事件后，您可以修改您的 Lambda 函数代码以使用其提供的信息。

 向您的 Lambda 函数提供以下代码，用于记录 Lambda Amazon MSK 事件的内容以进行测试：

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 .NET 将 Amazon MSK 事件与 Lambda 结合使用。  

```
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KafkaEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace MSKLambda;

public class Function
{
    
    
    /// <param name="input">The event for the Lambda function handler to process.</param>
    /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param>
    /// <returns></returns>
    public void FunctionHandler(KafkaEvent evnt, ILambdaContext context)
    {

        foreach (var record in evnt.Records)
        {
            Console.WriteLine("Key:" + record.Key); 
            foreach (var eventRecord in record.Value)
            {
                var valueBytes = eventRecord.Value.ToArray();    
                var valueText = Encoding.UTF8.GetString(valueBytes);
                
                Console.WriteLine("Message:" + valueText);
            }
        }
    }
    

}
```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Go 将 Amazon MSK 事件与 Lambda 结合使用。  

```
package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.KafkaEvent) {
	for key, records := range event.Records {
		fmt.Println("Key:", key)

		for _, record := range records {
			fmt.Println("Record:", record)

			decodedValue, _ := base64.StdEncoding.DecodeString(record.Value)
			message := string(decodedValue)
			fmt.Println("Message:", message)
		}
	}
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Java 将 Amazon MSK 事件与 Lambda 结合使用。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord;

import java.util.Base64;
import java.util.Map;

public class Example implements RequestHandler<KafkaEvent, Void> {

    @Override
    public Void handleRequest(KafkaEvent event, Context context) {
        for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) {
            String key = entry.getKey();
            System.out.println("Key: " + key);

            for (KafkaEventRecord record : entry.getValue()) {
                System.out.println("Record: " + record);

                byte[] value = Base64.getDecoder().decode(record.getValue());
                String message = new String(value);
                System.out.println("Message: " + message);
            }
        }

        return null;
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript（v3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 JavaScript 将 Amazon MSK 事件与 Lambda 结合使用。  

```
exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}
```
通过 TypeScript 将 Amazon MSK 事件与 Lambda 结合使用。  

```
import { MSKEvent, Context } from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "msk-handler-sample",
});

export const handler = async (
  event: MSKEvent,
  context: Context
): Promise<void> => {
  for (const [topic, topicRecords] of Object.entries(event.records)) {
    logger.info(`Processing key: ${topic}`);

    // Process each record in the partition
    for (const record of topicRecords) {
      try {
        // Decode the message value from base64
        const decodedMessage = Buffer.from(record.value, 'base64').toString();

        logger.info({
          message: decodedMessage
        });
      }
      catch (error) {
        logger.error('Error processing event', { error });
        throw error;
      }
    };
  }
}
```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 PHP 将 Amazon MSK 事件与 Lambda 结合使用。  

```
<?php
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kafka\KafkaEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): void
    {
        $kafkaEvent = new KafkaEvent($event);
        $this->logger->info("Processing records");
        $records = $kafkaEvent->getRecords();

        foreach ($records as $record) {
            try {
                $key = $record->getKey();
                $this->logger->info("Key: $key");

                $values = $record->getValue();
                $this->logger->info(json_encode($values));

                foreach ($values as $value) {
                    $this->logger->info("Value: $value");
                }
                
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Python 将 Amazon MSK 事件与 Lambda 结合使用。  

```
import base64

def lambda_handler(event, context):
    # Iterate through keys
    for key in event['records']:
        print('Key:', key)
        # Iterate through records
        for record in event['records'][key]:
            print('Record:', record)
            # Decode base64
            msg = base64.b64decode(record['value']).decode('utf-8')
            print('Message:', msg)
```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Ruby 将 Amazon MSK 事件与 Lambda 结合使用。  

```
require 'base64'

def lambda_handler(event:, context:)
  # Iterate through keys
  event['records'].each do |key, records|
    puts "Key: #{key}"

    # Iterate through records
    records.each do |record|
      puts "Record: #{record}"

      # Decode base64
      msg = Base64.decode64(record['value'])
      puts "Message: #{msg}"
    end
  end
end
```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Rust 将 Amazon MSK 事件与 Lambda 结合使用。  

```
use aws_lambda_events::event::kafka::KafkaEvent;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use base64::prelude::*;
use serde_json::{Value};
use tracing::{info};

/// Pre-Requisites:
/// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html
/// 2. Add packages tracing, tracing-subscriber, serde_json, base64
///
/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/

async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> {

    let payload = event.payload.records;

    for (_name, records) in payload.iter() {

        for record in records {

         let record_text = record.value.as_ref().ok_or("Value is None")?;
         info!("Record: {}", &record_text);

         // perform Base64 decoding
         let record_bytes = BASE64_STANDARD.decode(record_text)?;
         let message = std::str::from_utf8(&record_bytes)?;
         
         info!("Message: {}", message);
        }

    }

    Ok(().into())
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // required to enable CloudWatch error logging by the runtime
    tracing::init_default_subscriber();
    info!("Setup CW subscriber!");

    run(service_fn(function_handler)).await
}
```

------

您可以使用控制台向 Lambda 提供函数代码。

**要使用控制台代码编辑器更新函数代码。**

1. 打开 Lambda 控制台的[“函数”页面](https://console.aws.amazon.com/lambda/home#/functions)，然后选择函数。

1. 选择**代码**选项卡。

1. 在**代码源**窗格中，选择源代码文件并在集成的代码编辑器中对其进行编辑。

1. 在**部署**部分，选择**部署**以更新函数的代码：  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## 测试您的 Lambda 函数以验证其是否连接到您的 Amazon MSK 主题
<a name="w2aad101c23c15c35c31"></a>

现在，您可以通过查看 CloudWatch 事件日志来验证事件源是否正在调用您的 Lambda。

**验证是否正在调用您的 Lambda 函数**

1. 使用您的 Kafka 管理主机通过 CLI `kafka-console-producer` 生成 Kafka 事件。有关更多信息，请参阅 Kafka 文档中的 [Write some events into the topic](https://kafka.apache.org/documentation/#quickstart_send)。发送足够的事件以填充上一步中定义的事件源映射批次大小所定义的批次，否则 Lambda 将等待更多信息来调用。

1. 如果您的函数运行，则 Lambda 会将发生的事件写入 CloudWatch。在控制台中，导航到您的 Lambda 函数的详细信息页面。

1. 选择 **Configuration（配置）**选项卡。

1. 在侧栏中，选择**监控和操作工具**。

1. 在**日志记录配置**下确定 **CloudWatch 日志组**。日志组应以 `/aws/lambda` 开头。选择日志组的链接。

1. 在 CloudWatch 控制台中，检查**日志事件**，查看 Lambda 已发送到日志流的日志事件。确定是否存在包含来自 Kafka 事件消息的日志事件，如下图所示。如果有，则表示您已成功使用 Lambda 事件源映射将 Lambda 函数连接到 Amazon MSK。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/msk_tut_log.png)