

# Using Lambda with self-managed Apache Kafka
<a name="with-kafka"></a>

This topic describes how to use Lambda with a self-managed Kafka cluster. In AWS terminology, a self-managed cluster includes non-AWS hosted Kafka clusters. For example, you can host your Kafka cluster with a cloud provider such as [Confluent Cloud](https://www.confluent.io/confluent-cloud/) or [Redpanda](https://www.redpanda.com/).

This chapter explains how to use a self-managed Apache Kafka cluster as an event source for your Lambda function. The general process for integrating self-managed Apache Kafka with Lambda involves the following steps:

1. **[Cluster and network setup](with-kafka-cluster-network.md)** – First, set up your self-managed Apache Kafka cluster with the correct networking configuration to allow Lambda to access your cluster.

1. **[Event source mapping setup](with-kafka-configure.md)** – Then, create the [event source mapping](invocation-eventsourcemapping.md) resource that Lambda needs to securely connect your Apache Kafka cluster to your function.

1. **[Function and permissions setup](with-kafka-permissions.md)** – Finally, ensure that your function is correctly set up, and has the necessary permissions in its [execution role](lambda-intro-execution-role.md).

Apache Kafka as an event source operates similarly to using Amazon Simple Queue Service (Amazon SQS) or Amazon Kinesis. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload. The maximum batch size is configurable (the default is 100 messages). For more information, see [Batching behavior](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

To optimize the throughput of your self-managed Apache Kafka event source mapping, configure provisioned mode. In provisioned mode, you can define the minimum and maximum number of event pollers allocated to your event source mapping. This can improve the ability of your event source mapping to handle unexpected message spikes. For more information, see [provisioned mode](kafka-scaling-modes.md#kafka-provisioned-mode).

**Warning**  
Lambda event source mappings process each event at least once, and duplicate processing of records can occur. To avoid potential issues related to duplicate events, we strongly recommend that you make your function code idempotent. To learn more, see [How do I make my Lambda function idempotent](https://repost.aws/knowledge-center/lambda-function-idempotent) in the AWS Knowledge Center.

For Kafka-based event sources, Lambda supports processing control parameters, such as batching windows and batch size. For more information, see [Batching behavior](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

For an example of how to use self-managed Kafka as an event source, see [Using self-hosted Apache Kafka as an event source for AWS Lambda](https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/) on the AWS Compute Blog.

**Topics**
+ [

## Example event
](#smaa-sample-event)
+ [

# Configuring your self-managed Apache Kafka cluster and network for Lambda
](with-kafka-cluster-network.md)
+ [

# Configuring Lambda execution role permissions
](with-kafka-permissions.md)
+ [

# Configuring self-managed Apache Kafka event sources for Lambda
](with-kafka-configure.md)

## Example event
<a name="smaa-sample-event"></a>

Lambda sends the batch of messages in the event parameter when it invokes your Lambda function. The event payload contains an array of messages. Each array item contains details of the Kafka topic and Kafka partition identifier, together with a timestamp and a base64-encoded message.

```
{
   "eventSource": "SelfManagedKafka",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

# Configuring your self-managed Apache Kafka cluster and network for Lambda
<a name="with-kafka-cluster-network"></a>

To connect your Lambda function to your self-managed Apache Kafka cluster, you need to correctly configure your cluster and the network it resides in. This page describes how to configure your cluster and network. If your cluster and network are already configured properly, see [Configuring self-managed Apache Kafka event sources for Lambda](with-kafka-configure.md) to configure the event source mapping.

**Topics**
+ [

## Self-managed Apache Kafka cluster setup
](#kafka-cluster-setup)
+ [

## Configure network security
](#services-kafka-vpc-config)

## Self-managed Apache Kafka cluster setup
<a name="kafka-cluster-setup"></a>

You can host your self-managed Apache Kafka cluster with cloud providers such as [Confluent Cloud](https://www.confluent.io/confluent-cloud/) or [Redpanda](https://www.redpanda.com/), or run it on your own infrastructure. Ensure that your cluster is properly configured and accessible from the network where your Lambda event source mapping will connect.

## Configure network security
<a name="services-kafka-vpc-config"></a>

To give Lambda full access to self-managed Apache Kafka through your event source mapping, either your cluster must use a public endpoint (public IP address), or you must provide access to the Amazon VPC you created the cluster in.

When you use self-managed Apache Kafka with Lambda, create [AWS PrivateLink VPC endpoints](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html) that provide your function access to the resources in your Amazon VPC.

**Note**  
AWS PrivateLink VPC endpoints are required for functions with event source mappings that use the default (on-demand) mode for event pollers. If your event source mapping uses [ provisioned mode](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode), you don't need to configure AWS PrivateLink VPC endpoints.

Create an endpoint to provide access to the following resources:
+  Lambda — Create an endpoint for the Lambda service principal. 
+  AWS STS — Create an endpoint for the AWS STS in order for a service principal to assume a role on your behalf. 
+  Secrets Manager — If your cluster uses Secrets Manager to store credentials, create an endpoint for Secrets Manager. 

Alternatively, configure a NAT gateway on each public subnet in the Amazon VPC. For more information, see [Enable internet access for VPC-connected Lambda functions](configuration-vpc-internet.md).

When you create an event source mapping for self-managed Apache Kafka, Lambda checks whether Elastic Network Interfaces (ENIs) are already present for the subnets and security groups configured for your Amazon VPC. If Lambda finds existing ENIs, it attempts to re-use them. Otherwise, Lambda creates new ENIs to connect to the event source and invoke your function.

**Note**  
Lambda functions always run inside VPCs owned by the Lambda service. Your function's VPC configuration does not affect the event source mapping. Only the networking configuration of the event source's determines how Lambda connects to your event source.

Configure the security groups for the Amazon VPC containing your cluster. By default, self-managed Apache Kafka uses the following ports: `9092`.
+ Inbound rules – Allow all traffic on the default broker port for the security group associated with your event source. Alternatively, you can use a self-referencing security group rule to allow access from instances within the same security group.
+ Outbound rules – Allow all traffic on port `443` for external destinations if your function needs to communicate with AWS services. Alternatively, you can also use a self-referencing security group rule to limit access to the broker if you don't need to communicate with other AWS services.
+ Amazon VPC endpoint inbound rules — If you are using an Amazon VPC endpoint, the security group associated with your Amazon VPC endpoint must allow inbound traffic on port `443` from the cluster security group.

If your cluster uses authentication, you can also restrict the endpoint policy for the Secrets Manager endpoint. To call the Secrets Manager API, Lambda uses your function role, not the Lambda service principal.

**Example VPC endpoint policy — Secrets Manager endpoint**  

```
{
      "Statement": [
          {
              "Action": "secretsmanager:GetSecretValue",
              "Effect": "Allow",
              "Principal": {
                  "AWS": [
                      "arn:aws::iam::123456789012:role/my-role"
                  ]
              },
              "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
          }
      ]
  }
```

When you use Amazon VPC endpoints, AWS routes your API calls to invoke your function using the endpoint's Elastic Network Interface (ENI). The Lambda service principal needs to call `lambda:InvokeFunction` on any roles and functions that use those ENIs.

By default, Amazon VPC endpoints have open IAM policies that allow broad access to resources. Best practice is to restrict these policies to perform the needed actions using that endpoint. To ensure that your event source mapping is able to invoke your Lambda function, the VPC endpoint policy must allow the Lambda service principal to call `sts:AssumeRole` and `lambda:InvokeFunction`. Restricting your VPC endpoint policies to allow only API calls originating within your organization prevents the event source mapping from functioning properly, so `"Resource": "*"` is required in these policies.

The following example VPC endpoint policies show how to grant the required access to the Lambda service principal for the AWS STS and Lambda endpoints.

**Example VPC Endpoint policy — AWS STS endpoint**  

```
{
      "Statement": [
          {
              "Action": "sts:AssumeRole",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
    }
```

**Example VPC Endpoint policy — Lambda endpoint**  

```
{
      "Statement": [
          {
              "Action": "lambda:InvokeFunction",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
  }
```

# Configuring Lambda execution role permissions
<a name="with-kafka-permissions"></a>

In addition to [accessing your self-managed Kafka cluster](kafka-cluster-auth.md), your Lambda function needs permissions to perform various API actions. You add these permissions to the function's [execution role](lambda-intro-execution-role.md). If your users need access to any API actions, add the required permissions to the identity policy for the AWS Identity and Access Management (IAM) user or role.

**Topics**
+ [

## Required Lambda function permissions
](#smaa-api-actions-required)
+ [

## Optional Lambda function permissions
](#smaa-api-actions-optional)
+ [

## Adding permissions to your execution role
](#smaa-permissions-add-policy)
+ [

## Granting users access with an IAM policy
](#smaa-permissions-add-users)

## Required Lambda function permissions
<a name="smaa-api-actions-required"></a>

To create and store logs in a log group in Amazon CloudWatch Logs, your Lambda function must have the following permissions in its execution role:
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [logs:PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

## Optional Lambda function permissions
<a name="smaa-api-actions-optional"></a>

Your Lambda function might also need permissions to:
+ Describe your Secrets Manager secret.
+ Access your AWS Key Management Service (AWS KMS) customer managed key.
+ Access your Amazon VPC.
+ Send records of failed invocations to a destination.

### Secrets Manager and AWS KMS permissions
<a name="smaa-api-actions-secrets"></a>

Depending on the type of access control that you're configuring for your Kafka brokers, your Lambda function might need permission to access your Secrets Manager secret or to decrypt your AWS KMS customer managed key. To access these resources, your function's execution role must have the following permissions:
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html)
+ [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html)

### VPC permissions
<a name="smaa-api-actions-vpc"></a>

If only users within a VPC can access your self-managed Apache Kafka cluster, your Lambda function must have permission to access your Amazon VPC resources. These resources include your VPC, subnets, security groups, and network interfaces. To access these resources, your function's execution role must have the following permissions:
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)

## Adding permissions to your execution role
<a name="smaa-permissions-add-policy"></a>

To access other AWS services that your self-managed Apache Kafka cluster uses, Lambda uses the permissions policies that you define in your Lambda function's [execution role](lambda-intro-execution-role.md).

By default, Lambda is not permitted to perform the required or optional actions for a self-managed Apache Kafka cluster. You must create and define these actions in an [IAM trust policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_update-role-trust-policy.html) for your execution role. This example shows how you might create a policy that allows Lambda to access your Amazon VPC resources.

------
#### [ JSON ]

****  

```
{
        "Version":"2012-10-17",		 	 	 
        "Statement":[
           {
              "Effect":"Allow",
              "Action":[
                 "ec2:CreateNetworkInterface",
                 "ec2:DescribeNetworkInterfaces",
                 "ec2:DescribeVpcs",
                 "ec2:DeleteNetworkInterface",
                 "ec2:DescribeSubnets",
                 "ec2:DescribeSecurityGroups"
              ],
              "Resource":"*"
           }
        ]
     }
```

------

## Granting users access with an IAM policy
<a name="smaa-permissions-add-users"></a>

By default, users and roles don't have permission to perform [event source API operations](invocation-eventsourcemapping.md#event-source-mapping-api). To grant access to users in your organization or account, you create or update an identity-based policy. For more information, see [Controlling access to AWS resources using policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_controlling.html) in the *IAM User Guide*.

For troubleshooting authentication and authorization errors, see [Troubleshooting Kafka event source mapping errors](with-kafka-troubleshoot.md).

# Configuring self-managed Apache Kafka event sources for Lambda
<a name="with-kafka-configure"></a>

To use a self-managed Apache Kafka cluster as an event source for your Lambda function, you create an [event source mapping](invocation-eventsourcemapping.md) that connects the two resources. This page describes how to create an event source mapping for self-managed Apache Kafka.

This page assumes that you've already properly configured your Kafka cluster and the network it resides in. If you need to set up your cluster or network, see [Configuring your self-managed Apache Kafka cluster and network for Lambda](with-kafka-cluster-network.md).

**Topics**
+ [

## Using a self-managed Apache Kafka cluster as an event source
](#kafka-esm-overview)
+ [

# Configuring cluster authentication methods in Lambda
](kafka-cluster-auth.md)
+ [

# Creating a Lambda event source mapping for a self-managed Apache Kafka event source
](kafka-esm-create.md)
+ [

# All self-managed Apache Kafka event source configuration parameters in Lambda
](kafka-esm-parameters.md)

## Using a self-managed Apache Kafka cluster as an event source
<a name="kafka-esm-overview"></a>

When you add your Apache Kafka or Amazon MSK cluster as a trigger for your Lambda function, the cluster is used as an [event source](invocation-eventsourcemapping.md).

Lambda reads event data from the Kafka topics that you specify as `Topics` in a [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) request, based on the [starting position](kafka-starting-positions.md) that you specify. After successful processing, your Kafka topic is committed to your Kafka cluster.

Lambda reads messages sequentially for each Kafka topic partition. A single Lambda payload can contain messages from multiple partitions. When more records are available, Lambda continues processing records in batches, based on the BatchSize value that you specify in a [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) request, until your function catches up with the topic.

After Lambda processes each batch, it commits the offsets of the messages in that batch. If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing.

**Note**  
While Lambda functions typically have a maximum timeout limit of 15 minutes, event source mappings for Amazon MSK, self-managed Apache Kafka, Amazon DocumentDB, and Amazon MQ for ActiveMQ and RabbitMQ only support functions with maximum timeout limits of 14 minutes.

# Configuring cluster authentication methods in Lambda
<a name="kafka-cluster-auth"></a>

Lambda supports several methods to authenticate with your self-managed Apache Kafka cluster. Make sure that you configure the Kafka cluster to use one of these supported authentication methods. For more information about Kafka security, see the [Security](http://kafka.apache.org/documentation.html#security) section of the Kafka documentation.

## SASL/SCRAM authentication
<a name="smaa-auth-sasl"></a>

Lambda supports Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) authentication with Transport Layer Security (TLS) encryption (`SASL_SSL`). Lambda sends the encrypted credentials to authenticate with the cluster. Lambda doesn't support SASL/SCRAM with plaintext (`SASL_PLAINTEXT`). For more information about SASL/SCRAM authentication, see [RFC 5802](https://tools.ietf.org/html/rfc5802).

Lambda also supports SASL/PLAIN authentication. Because this mechanism uses clear text credentials, the connection to the server must use TLS encryption to ensure that the credentials are protected.

For SASL authentication, you store the sign-in credentials as a secret in AWS Secrets Manager. For more information about using Secrets Manager, see [Create an AWS Secrets Manager secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html) in the *AWS Secrets Manager User Guide*.

**Important**  
To use Secrets Manager for authentication, secrets must be stored in the same AWS region as your Lambda function.

## Mutual TLS authentication
<a name="smaa-auth-mtls"></a>

Mutual TLS (mTLS) provides two-way authentication between the client and server. The client sends a certificate to the server for the server to verify the client, and the server sends a certificate to the client for the client to verify the server. 

In self-managed Apache Kafka, Lambda acts as the client. You configure a client certificate (as a secret in Secrets Manager) to authenticate Lambda with your Kafka brokers. The client certificate must be signed by a CA in the server's trust store.

The Kafka cluster sends a server certificate to Lambda to authenticate the Kafka brokers with Lambda. The server certificate can be a public CA certificate or a private CA/self-signed certificate. The public CA certificate must be signed by a certificate authority (CA) that's in the Lambda trust store. For a private CA/self-signed certificate, you configure the server root CA certificate (as a secret in Secrets Manager). Lambda uses the root certificate to verify the Kafka brokers.

For more information about mTLS, see [ Introducing mutual TLS authentication for Amazon MSK as an event source](https://aws.amazon.com/blogs/compute/introducing-mutual-tls-authentication-for-amazon-msk-as-an-event-source).

## Configuring the client certificate secret
<a name="smaa-auth-secret"></a>

The CLIENT\$1CERTIFICATE\$1TLS\$1AUTH secret requires a certificate field and a private key field. For an encrypted private key, the secret requires a private key password. Both the certificate and private key must be in PEM format.

**Note**  
Lambda supports the [PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (but not PBES2) private key encryption algorithms.

The certificate field must contain a list of certificates, beginning with the client certificate, followed by any intermediate certificates, and ending with the root certificate. Each certificate must start on a new line with the following structure:

```
-----BEGIN CERTIFICATE-----  
            <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager supports secrets up to 65,536 bytes, which is enough space for long certificate chains.

The private key must be in [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) format, with the following structure:

```
-----BEGIN PRIVATE KEY-----  
             <private key contents>
-----END PRIVATE KEY-----
```

For an encrypted private key, use the following structure:

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
              <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

The following example shows the contents of a secret for mTLS authentication using an encrypted private key. For an encrypted private key, include the private key password in the secret.

```
{"privateKeyPassword":"testpassword",
"certificate":"-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
"privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

## Configuring the server root CA certificate secret
<a name="smaa-auth-ca-cert"></a>

You create this secret if your Kafka brokers use TLS encryption with certificates signed by a private CA. You can use TLS encryption for VPC, SASL/SCRAM, SASL/PLAIN, or mTLS authentication.

The server root CA certificate secret requires a field that contains the Kafka broker's root CA certificate in PEM format. The following example shows the structure of the secret.

```
{"certificate":"-----BEGIN CERTIFICATE-----
MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx
EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT
HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs
ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG...
-----END CERTIFICATE-----"
}
```

# Creating a Lambda event source mapping for a self-managed Apache Kafka event source
<a name="kafka-esm-create"></a>

To create an event source mapping, you can use the Lambda console, the [AWS Command Line Interface (CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html), or an [AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/).

The following console steps add a self-managed Apache Kafka cluster as a trigger for your Lambda function. Under the hood, this creates an event source mapping resource.

## Prerequisites
<a name="kafka-esm-prereqs"></a>
+ A self-managed Apache Kafka cluster. Lambda supports Apache Kafka version 0.10.1.0 and later.
+ An [execution role](lambda-intro-execution-role.md) with permission to access the AWS resources that your self-managed Kafka cluster uses.

## Adding a self-managed Kafka cluster (console)
<a name="kafka-esm-console"></a>

Follow these steps to add your self-managed Apache Kafka cluster and a Kafka topic as a trigger for your Lambda function.

**To add an Apache Kafka trigger to your Lambda function (console)**

1. Open the [Functions page](https://console.aws.amazon.com/lambda/home#/functions) of the Lambda console.

1. Choose the name of your Lambda function.

1. Under **Function overview**, choose **Add trigger**.

1. Under **Trigger configuration**, do the following:

   1. Choose the **Apache Kafka** trigger type.

   1. For **Bootstrap servers**, enter the host and port pair address of a Kafka broker in your cluster, and then choose **Add**. Repeat for each Kafka broker in the cluster.

   1. For **Topic name**, enter the name of the Kafka topic used to store records in the cluster.

   1. If you configure provisioned mode, enter a value for **Minimum event pollers**, a value for **Maximum event pollers**, and an optional value for PollerGroupName to specify grouping of multiple ESMs within the same event source VPC.

   1. (Optional) For **Batch size**, enter the maximum number of records to receive in a single batch.

   1. For **Batch window**, enter the maximum amount of seconds that Lambda spends gathering records before invoking the function.

   1. (Optional) For **Consumer group ID**, enter the ID of a Kafka consumer group to join.

   1. (Optional) For **Starting position**, choose **Latest** to start reading the stream from the latest record, **Trim horizon** to start at the earliest available record, or **At timestamp** to specify a timestamp to start reading from.

   1. (Optional) For **VPC**, choose the Amazon VPC for your Kafka cluster. Then, choose the **VPC subnets** and **VPC security groups**.

      This setting is required if only users within your VPC access your brokers.

      

   1. (Optional) For **Authentication**, choose **Add**, and then do the following:

      1. Choose the access or authentication protocol of the Kafka brokers in your cluster.
         + If your Kafka broker uses SASL/PLAIN authentication, choose **BASIC\$1AUTH**.
         + If your broker uses SASL/SCRAM authentication, choose one of the **SASL\$1SCRAM** protocols.
         + If you're configuring mTLS authentication, choose the **CLIENT\$1CERTIFICATE\$1TLS\$1AUTH** protocol.

      1. For SASL/SCRAM or mTLS authentication, choose the Secrets Manager secret key that contains the credentials for your Kafka cluster.

   1. (Optional) For **Encryption**, choose the Secrets Manager secret containing the root CA certificate that your Kafka brokers use for TLS encryption, if your Kafka brokers use certificates signed by a private CA.

      This setting applies to TLS encryption for SASL/SCRAM or SASL/PLAIN, and to mTLS authentication.

   1. To create the trigger in a disabled state for testing (recommended), clear **Enable trigger**. Or, to enable the trigger immediately, select **Enable trigger**.

1. To create the trigger, choose **Add**.

## Adding a self-managed Kafka cluster (AWS CLI)
<a name="kafka-esm-cli"></a>

Use the following example AWS CLI commands to create and view a self-managed Apache Kafka trigger for your Lambda function.

### Using SASL/SCRAM
<a name="kafka-esm-cli-create"></a>

If Kafka users access your Kafka brokers over the internet, specify the Secrets Manager secret that you created for SASL/SCRAM authentication. The following example uses the [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) AWS CLI command to map a Lambda function named `my-kafka-function` to a Kafka topic named `AWSKafkaTopic`.

```
aws lambda create-event-source-mapping \ 
  --topics AWSKafkaTopic \
  --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \
  --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
```

### Using a VPC
<a name="kafka-esm-cli-create-vpc"></a>

If only Kafka users within your VPC access your Kafka brokers, you must specify your VPC, subnets, and VPC security group. The following example uses the [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) AWS CLI command to map a Lambda function named `my-kafka-function` to a Kafka topic named `AWSKafkaTopic`.

```
aws lambda create-event-source-mapping \ 
  --topics AWSKafkaTopic \
  --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \
  --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
```

### Viewing the status using the AWS CLI
<a name="kafka-esm-cli-view"></a>

The following example uses the [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) AWS CLI command to describe the status of the event source mapping that you created.

```
aws lambda get-event-source-mapping
              --uuid dh38738e-992b-343a-1077-3478934hjkfd7
```

# All self-managed Apache Kafka event source configuration parameters in Lambda
<a name="kafka-esm-parameters"></a>

All Lambda event source types share the same [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) and [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API operations. However, only some of the parameters apply to self-managed Apache Kafka, as shown in the following table.


| Parameter | Required | Default | Notes | 
| --- | --- | --- | --- | 
|  BatchSize  |  N  |  100  |  Maximum: 10,000  | 
|  DestinationConfig  |  N  |  N/A  |  [Capturing discarded batches for Amazon MSK and self-managed Apache Kafka event sources](kafka-on-failure.md)  | 
|  Enabled  |  N  |  True  |  | 
|  FilterCriteria  |  N  |  N/A  |  [Control which events Lambda sends to your function](invocation-eventfiltering.md)  | 
|  FunctionName  |  Y  |  N/A  |    | 
|  KMSKeyArn  |  N  |  N/A  |  [Encryption of filter criteria](invocation-eventfiltering.md#filter-criteria-encryption)  | 
|  MaximumBatchingWindowInSeconds  |  N  |  500 ms  |  [Batching behavior](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  ProvisionedPollersConfig  |  N  |  `MinimumPollers`: default value of 1 if not specified `MaximumPollers`: default value of 200 if not specified `PollerGroupName`: N/A  |  [Provisioned mode](kafka-scaling-modes.md#kafka-provisioned-mode)  | 
|  SelfManagedEventSource  |  Y  | N/A |  List of Kafka Brokers. Can set only on Create  | 
|  SelfManagedKafkaEventSourceConfig  |  N  |  Contains the ConsumerGroupId field which defaults to a unique value.  |  Can set only on Create  | 
|  SourceAccessConfigurations  |  N  |  No credentials  |  VPC information or authentication credentials for the cluster   For SASL\$1PLAIN, set to BASIC\$1AUTH  | 
|  StartingPosition  |  Y  |  N/A  |  AT\$1TIMESTAMP, TRIM\$1HORIZON, or LATEST Can set only on Create  | 
|  StartingPositionTimestamp  |  N  |  N/A  |  Required if StartingPosition is set to AT\$1TIMESTAMP  | 
|  Tags  |  N  |  N/A  |  [Using tags on event source mappings](tags-esm.md)  | 
|  Topics  |  Y  |  N/A  |  Topic name Can set only on Create  | 

**Note**  
When you specify a `PollerGroupName`, multiple ESMs within the same Amazon VPC can share Event Poller Unit (EPU) capacity. You can use this option to optimize Provisioned mode costs for your ESMs. Requirements for ESM grouping:  
ESMs must be within the same Amazon VPC
Maximum of 100 ESMs per poller group
Aggregate maximum pollers across all ESMs in a group cannot exceed 2000
You can update the `PollerGroupName` to move an ESM to a different group, or remove an ESM from a group by setting `PollerGroupName` to an empty string ("").