

# 将 Lambda 与自行管理的 Apache Kafka 结合使用
<a name="with-kafka"></a>

本主题介绍了如何将 Lambda 与自行管理的 Kafka 集群结合使用。在AWS术语中，自行管理的群集包括非AWS托管 Kafka 集群。例如，可以通过 [Confluent Cloud](https://www.confluent.io/confluent-cloud/) 或 [Redpanda](https://www.redpanda.com/) 等云提供程序来托管 Kafka 集群。

本章说明如何将自托管式 Apache Kafka 集群用作 Lambda 函数的事件源。将自托管式 Apache Kafka 与 Lambda 集成的一般过程包括以下步骤：

1. **[集群和网络设置](with-kafka-cluster-network.md)** — 首先，使用正确的网络配置设置自托管式 Apache Kafka 集群，以允许 Lambda 访问集群。

1. **[事件源映射设置](with-kafka-configure.md)**：然后，创建 Lambda 需要的[事件源映射](invocation-eventsourcemapping.md)资源，以将 Apache Kafka 集群安全地连接到您的函数。

1. **[函数和权限设置](with-kafka-permissions.md)**：最后，请确保您的函数设置正确，并且在其[执行角色](lambda-intro-execution-role.md)中具有必要的权限。

Apache Kafka 作为事件源，运行方式与使用 Amazon Simple Queue Service (Amazon SQS) 或 Amazon Kinesis 相似。Lambda 在内部轮询来自事件源的新消息，然后同步调用目标 Lambda 函数。Lambda 批量读取消息，并将这些消息作为事件有效负载提供给您的函数。最大批处理大小是可配置的（默认值为 100 条消息）。有关更多信息，请参阅 [批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

要优化自我管理的 Apache Kafka 事件源映射的吞吐量，请配置预调配模式。在预调配模式下，您可以定义分配给事件源映射的事件轮询器的最小和最大数量。这可以提高事件源映射处理意外消息激增的能力。有关更多信息，请参阅[预调配模式](kafka-scaling-modes.md#kafka-provisioned-mode)。

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 AWS 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

对于基于 Kafka 的事件源，Lambda 支持处理控制参数，例如批处理时段和批处理大小。有关更多信息，请参阅 [批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

有关如何使用自行管理的 Kafka 作为事件源的示例，请参阅AWS计算博客上的[使用自行托管的 Apache Kafka 作为 AWS Lambda 事件源](https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/)。

**Topics**
+ [事件示例](#smaa-sample-event)
+ [为 Lambda 配置自托管式 Apache Kafka 集群和网络](with-kafka-cluster-network.md)
+ [配置 Lambda 执行角色权限](with-kafka-permissions.md)
+ [为 Lambda 配置自托管式 Apache Kafka 事件源](with-kafka-configure.md)

## 事件示例
<a name="smaa-sample-event"></a>

当 Lambda 调用 Lambda 函数时，它会在事件参数中发送一批消息。事件负载包含一个消息数组。每个数组项目都包含 Kafka 主题和 Kafka 分区标识符的详细信息，以及时间戳和 base64 编码的消息。

```
{
   "eventSource": "SelfManagedKafka",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```