

# 为自托管式 Apache Kafka 事件源创建 Lambda 事件源映射
<a name="kafka-esm-create"></a>

要创建事件源映射，您可以使用 Lambda 控制台、[AWS Command Line Interface (CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 或 [AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/)。

以下控制台步骤添加自托管式 Apache Kafka 集群作为 Lambda 函数的触发器。这将在后台创建一个事件源映射资源。

## 先决条件
<a name="kafka-esm-prereqs"></a>
+ 自行管理的 Apache Kafka 集群。Lambda 支持 Apache Kafka 版本 0.10.1.0 及更高版本。
+ 一个有权访问您自行管理的 Kafka 集群所用 AWS 资源的[执行角色](lambda-intro-execution-role.md)。

## 添加自行管理的 Kafka 集群（控制台）
<a name="kafka-esm-console"></a>

按照以下步骤将自行管理的 Apache Kafka 集群和 Kafka 主题添加为 Lambda 函数的触发器。

**将 Apache Kafka 触发器添加到 Lambda 函数（控制台）**

1. 打开 Lamba 控制台的 [Functions（函数）页面。](https://console.aws.amazon.com/lambda/home#/functions)

1. 选择 Lambda 函数的名称。

1. 在 **Function overview**（函数概览）下，选择 **Add trigger**（添加触发器）。

1. 在 **Trigger configuration**（触发配置）下，执行以下操作：

   1. 选择 **Apache Kafka** 触发类型。

   1. 对于 **Bootstrap servers**（Bootstrap 引导服务器），输入集群中 Kafka 代理的主机和端口对地址，然后选择 **Add**（添加）。对集群中的每个 Kafka 代理重复此操作。

   1. 对于 **Topic name**（主题名称），输入用于在集群中存储记录的 Kafka 主题的名称。

   1. 如果您配置了预置模式，输入**最少事件轮询器**的值、**最多事件轮询器**的值以及 PollerGroupName 的可选值，以指定同一个事件源 VPC 中的多个 ESM 的分组。

   1. （可选）对于 **Batch size**（批处理大小），输入要在单个批次中接收的最大记录数。

   1. 对于 **Batch window（批处理时段）**，输入 Lambda 在调用函数之前收集记录所花费的最大秒数。

   1. （可选）对于 **Consumer group ID（使用者组 ID）**，输入要加入的 Kafka 使用者组的 ID。

   1. （可选）对于**起始位置**，选择**最新**即可从最新记录开始读取流，选择**最早**即可从最早的可用记录开始读取流，选择**在时间戳处**即可从指定的时间戳开始读取流。

   1. （可选）对于 **VPC**，请为您的 Kafka 集群选择 Amazon VPC。然后，选择 **VPC subnets**（VPC 子网）和 **VPC security groups**（VPC 安全组）。

      如果仅 VPC 内部的用户访问代理，则需要此设置。

      

   1. （可选）对于 **Authentication**（身份验证），请选择 **Add**（添加），然后执行以下操作：

      1. 选择集群中 Kafka 代理的访问权限或身份验证协议。
         + 如果 Kafka 代理使用 SASL/PLAIN 身份验证，请选择 **BASIC\$1AUTH**。
         + 如果代理使用 SASL/SCRAM 身份验证，请选择其中一个 **SASL\$1SCRAM** 协议。
         + 如果要配置 mTLS 身份验证，请选择 **CLIENT\$1CERTIFICATE\$1TLS\$1AUTH** 协议。

      1. 对于 SASL/SCRAM 或 mTLS 身份验证，请选择包含 Kafka 集群凭据的 Secrets Manager 私有密钥。

   1. （可选）对于 **Encryption**（加密），如果您的 Kafka 代理使用由私有 CA 签名的证书，请选择包含根 CA 证书的 Secrets Manager 密钥，Kafka 代理使用该证进行 TLS 加密。

      此设置适用于 SASL/SCRAM 或 SASL/PLAIN 的 TLS 加密，以及 mTLS 身份验证。

   1. 要在禁用状态下创建触发器以进行测试（推荐），请清除 **Enable trigger**（启用触发器）。或者，要立即启用该触发器，请选择 **Enable trigger**（启用触发器）。

1. 要创建触发器，请选择 **Add**（添加）。

## 添加自行管理的 Kafka 集群 (AWS CLI)
<a name="kafka-esm-cli"></a>

使用以下示例 AWS CLI 命令为 Lambda 函数创建和查看自行管理的 Apache Kafka 触发器。

### 使用 SASL/SCRAM
<a name="kafka-esm-cli-create"></a>

如果 Kafka 用户通过互联网访问您的 Kafka 代理，则需要指定为 SASL/SCRAM 身份验证创建的 Secrets Manager。以下示例使用 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) AWS CLI 命令将名为 `my-kafka-function` 的 Lambda 函数映射至名为 `AWSKafkaTopic` 的 Kafka 主题。

```
aws lambda create-event-source-mapping \ 
  --topics AWSKafkaTopic \
  --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \
  --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
```

### 使用 VPC
<a name="kafka-esm-cli-create-vpc"></a>

如果仅 VPC 中的 Kafka 用户访问 Kafka 代理，则必须指定 VPC、子网和 VPC 安全组。以下示例使用 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) AWS CLI 命令将名为 `my-kafka-function` 的 Lambda 函数映射至名为 `AWSKafkaTopic` 的 Kafka 主题。

```
aws lambda create-event-source-mapping \ 
  --topics AWSKafkaTopic \
  --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \
  --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
```

### 使用 AWS CLI 查看状态
<a name="kafka-esm-cli-view"></a>

以下示例使用 [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) AWS CLI 命令来描述您创建的事件源映射的状态。

```
aws lambda get-event-source-mapping
              --uuid dh38738e-992b-343a-1077-3478934hjkfd7
```