Configuring Amazon MSK event sources for Lambda
Before you create an event source mapping for your Amazon MSK cluster, you need to ensure that your cluster and the VPC it resides in are correctly configured. You also need to make sure that your Lambda function's execution role has the necessary IAM permissions.
Follow the instructions in the following sections to configure your Amazon MSK cluster, VPC, and Lambda function. To learn how to create the event source mapping, see Adding Amazon MSK as an event source.
Topics
MSK cluster authentication
Lambda needs permission to access the Amazon MSK cluster, retrieve records, and perform other tasks. Amazon MSK supports several options for controlling client access to the MSK cluster.
Cluster access options
Unauthenticated access
If no clients access the cluster over the internet, you can use unauthenticated access.
SASL/SCRAM authentication
Amazon MSK supports Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) authentication with Transport Layer Security (TLS) encryption. For Lambda to connect to the cluster, you store the authentication credentials (user name and password) in an AWS Secrets Manager secret.
For more information about using Secrets Manager, see User name and password authentication with AWS Secrets Manager in the Amazon Managed Streaming for Apache Kafka Developer Guide.
Amazon MSK doesn't support SASL/PLAIN authentication.
IAM role-based authentication
You can use IAM to authenticate the identity of clients that connect to the MSK cluster. If IAM auth is active on your MSK cluster, and you don't provide a secret for auth, Lambda automatically defaults to using IAM auth. To create and deploy user or role-based policies, use the IAM console or API. For more information, see IAM access control in the Amazon Managed Streaming for Apache Kafka Developer Guide.
To allow Lambda to connect to the MSK cluster, read records, and perform other required actions, add the following permissions to your function's execution role.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:
region
:account-id
:cluster/cluster-name
/cluster-uuid
", "arn:aws:kafka:region
:account-id
:topic/cluster-name
/cluster-uuid
/topic-name
", "arn:aws:kafka:region
:account-id
:group/cluster-name
/cluster-uuid
/consumer-group-id
" ] } ] }
You can scope these permissions to a specific cluster, topic, and group. For more information, see the Amazon MSK Kafka actions in the Amazon Managed Streaming for Apache Kafka Developer Guide.
Mutual TLS authentication
Mutual TLS (mTLS) provides two-way authentication between the client and server. The client sends a certificate to the server for the server to verify the client, and the server sends a certificate to the client for the client to verify the server.
For Amazon MSK, Lambda acts as the client. You configure a client certificate (as a secret in Secrets Manager) to authenticate Lambda with the brokers in your MSK cluster. The client certificate must be signed by a CA in the server's trust store. The MSK cluster sends a server certificate to Lambda to authenticate the brokers with Lambda. The server certificate must be signed by a certificate authority (CA) that's in the AWS trust store.
For instructions on how to generate a client certificate, see
Introducing mutual TLS authentication for Amazon MSK as an event source
Amazon MSK doesn't support self-signed server certificates, because all brokers in Amazon MSK use public certificates signed by
Amazon Trust Services CAs
For more information about mTLS for Amazon MSK, see Mutual TLS Authentication in the Amazon Managed Streaming for Apache Kafka Developer Guide.
Configuring the mTLS secret
The CLIENT_CERTIFICATE_TLS_AUTH secret requires a certificate field and a private key field. For an encrypted private key, the secret requires a private key password. Both the certificate and private key must be in PEM format.
Note
Lambda supports the PBES1
The certificate field must contain a list of certificates, beginning with the client certificate, followed by any intermediate certificates, and ending with the root certificate. Each certificate must start on a new line with the following structure:
-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----
Secrets Manager supports secrets up to 65,536 bytes, which is enough space for long certificate chains.
The private key must be in PKCS #8
-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----
For an encrypted private key, use the following structure:
-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----
The following example shows the contents of a secret for mTLS authentication using an encrypted private key. For an encrypted private key, you include the private key password in the secret.
{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }
How Lambda chooses a bootstrap broker
Lambda chooses a bootstrap broker based on the authentication methods available on your cluster, and whether you provide a secret for authentication. If you provide a secret for mTLS or SASL/SCRAM, Lambda automatically chooses that auth method. If you don't provide a secret, Lambda selects the strongest auth method that's active on your cluster. The following is the order of priority in which Lambda selects a broker, from strongest to weakest auth:
mTLS (secret provided for mTLS)
SASL/SCRAM (secret provided for SASL/SCRAM)
SASL IAM (no secret provided, and IAM auth active)
Unauthenticated TLS (no secret provided, and IAM auth not active)
Plaintext (no secret provided, and both IAM auth and unauthenticated TLS are not active)
Note
If Lambda can't connect to the most secure broker type, Lambda doesn't attempt to connect to a different (weaker) broker type. If you want Lambda to choose a weaker broker type, deactivate all stronger auth methods on your cluster.
Managing API access and permissions
In addition to accessing the Amazon MSK cluster, your function needs permissions to perform various Amazon MSK API actions. You add these permissions to the function's execution role. If your users need access to any of the Amazon MSK API actions, add the required permissions to the identity policy for the user or role.
You can add each of the following permissions to your execution role manually. Alternatively, you can attach the
AWS managed policy AWSLambdaMSKExecutionRole to your execution role. The
AWSLambdaMSKExecutionRole
policy contains all required API actions and VPC permissions listed below.
Required Lambda function execution role permissions
To create and store logs in a log group in Amazon CloudWatch Logs, your Lambda function must have the following permissions in its execution role:
For Lambda to access your Amazon MSK cluster on your behalf, your Lambda function must have the following permissions in its execution role:
-
kafka:DescribeVpcConnection: Only required for cross-account event source mappings.
-
kafka:ListVpcConnections: Not required in execution role, but required for an IAM principal that is creating a cross-account event source mapping.
You only need to add one of either kafka:DescribeCluster
or kafka:DescribeClusterV2
. For provisioned MSK
clusters, either permission works. For serverless MSK clusters, you must use kafka:DescribeClusterV2
.
Note
Lambda eventually plans to remove the kafka:DescribeCluster
permission from the
associated AWSLambdaMSKExecutionRole
managed policy. If you use this policy, you should
migrate any applications using kafka:DescribeCluster
to use
kafka:DescribeClusterV2
instead.
VPC permissions
If only users within a VPC can access your Amazon MSK cluster, your Lambda function must have permission to access your Amazon VPC resources. These resources include your VPC, subnets, security groups, and network interfaces. To access these resources, your function's execution role must have the following permissions. These permissions are included in the AWSLambdaMSKExecutionRole AWS managed policy.
Optional Lambda function permissions
Your Lambda function might also need permissions to:
-
Access your SCRAM secret, if using SASL/SCRAM authentication.
-
Describe your Secrets Manager secret.
-
Access your AWS Key Management Service (AWS KMS) customer managed key.
-
Send records of failed invocations to a destination.
Secrets Manager and AWS KMS permissions
Depending on the type of access control that you're configuring for your Amazon MSK brokers, your Lambda function might need permission to access your SCRAM secret (if using SASL/SCRAM authentication), or Secrets Manager secret to decrypt your AWS KMS customer managed key. To access these resources, your function's execution role must have the following permissions:
Adding permissions to your execution role
Follow these steps to add the AWS managed policy AWSLambdaMSKExecutionRole to your execution role using the IAM console.
To add an AWS managed policy
-
Open the Policies page
of the IAM console. -
In the search box, enter the policy name (
AWSLambdaMSKExecutionRole
). -
Select the policy from the list, and then choose Policy actions, Attach.
-
On the Attach policy page, select your execution role from the list, and then choose Attach policy.
Granting users access with an IAM policy
By default, users and roles don't have permission to perform Amazon MSK API operations. To grant access to users in your organization or account, you can add or update an identity-based policy. For more information, see Amazon MSK Identity-Based Policy Examples in the Amazon Managed Streaming for Apache Kafka Developer Guide.
Authentication and authorization errors
If any of the permissions required to consume data from the Amazon MSK cluster are missing, Lambda displays one of the following error messages in the event source mapping under LastProcessingResult.
Error messages
Cluster failed to authorize Lambda
For SASL/SCRAM or mTLS, this error indicates that the provided user doesn't have all of the following required Kafka access control list (ACL) permissions:
DescribeConfigs Cluster
Describe Group
Read Group
Describe Topic
Read Topic
For IAM access control, your function's execution role is missing one or more of the permissions required to access the group or topic. Review the list of required permissions in IAM role-based authentication.
When you create either Kafka ACLs or an IAM policy with the required Kafka cluster permissions, specify the topic and group as resources. The topic name must match the topic in the event source mapping. The group name must match the event source mapping's UUID.
After you add the required permissions to the execution role, it might take several minutes for the changes to take effect.
SASL authentication failed
For SASL/SCRAM, this error indicates that the provided user name and password aren't valid.
For IAM access control, the execution role is missing the kafka-cluster:Connect
permission
for the MSK cluster. Add this permission to the role and specify the cluster's Amazon Resource Name (ARN) as a
resource.
You might see this error occurring intermittently. The cluster rejects connections after the number of TCP
connections exceeds the Amazon MSK service
quota. Lambda backs off and retries until a connection is successful. After Lambda connects to the
cluster and polls for records, the last processing result changes to OK
.
Server failed to authenticate Lambda
This error indicates that the Amazon MSK Kafka brokers failed to authenticate with Lambda. This can occur for any of the following reasons:
You didn't provide a client certificate for mTLS authentication.
You provided a client certificate, but the brokers aren't configured to use mTLS.
A client certificate isn't trusted by the brokers.
Provided certificate or private key is invalid
This error indicates that the Amazon MSK consumer couldn't use the provided certificate or private key. Make sure that the certificate and key use PEM format, and that the private key encryption uses a PBES1 algorithm.
Configure network security
To give Lambda full access to Amazon MSK through your event source mapping, either your cluster must use a public endpoint (public IP address), or you must provide access to the Amazon VPC you created the cluster in.
When you use Amazon MSK with Lambda, create AWS PrivateLink VPC endpoints that provide your function access to the resources in your Amazon VPC.
Note
AWS PrivateLink VPC endpoints are required for functions with event source mappings that use the default (on-demand) mode for event pollers. If your event source mapping uses provisioned mode, you don't need to configure AWS PrivateLink VPC endpoints.
Create an endpoint to provide access to the following resources:
-
Lambda — Create an endpoint for the Lambda service principal.
-
AWS STS — Create an endpoint for the AWS STS in order for a service principal to assume a role on your behalf.
-
Secrets Manager — If your cluster uses Secrets Manager to store credentials, create an endpoint for Secrets Manager.
Alternatively, configure a NAT gateway on each public subnet in the Amazon VPC. For more information, see Enable internet access for VPC-connected Lambda functions.
When you create an event source mapping for Amazon MSK, Lambda checks whether Elastic Network Interfaces (ENIs) are already present for the subnets and security groups configured for your Amazon VPC. If Lambda finds existing ENIs, it attempts to re-use them. Otherwise, Lambda creates new ENIs to connect to the event source and invoke your function.
Note
Lambda functions always run inside VPCs owned by the Lambda service. Your function's VPC configuration does not affect the event source mapping. Only the networking configuration of the event source's determines how Lambda connects to your event source.
Configure the security groups for the Amazon VPC containing your cluster. By default,
Amazon MSK uses the following ports: 9092
for plaintext, 9094
for TLS, 9096
for SASL, 9098
for IAM.
-
Inbound rules – Allow all traffic on the default cluster port for the security group associated with your event source.
-
Outbound rules – Allow all traffic on port
443
for all destinations. Allow all traffic on the default cluster port for the security group associated with your event source. -
Amazon VPC endpoint inbound rules — If you are using an Amazon VPC endpoint, the security group associated with your Amazon VPC endpoint must allow inbound traffic on port
443
from the cluster security group.
If your cluster uses authentication, you can also restrict the endpoint policy for the Secrets Manager endpoint. To call the Secrets Manager API, Lambda uses your function role, not the Lambda service principal.
Example VPC endpoint policy — Secrets Manager endpoint
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "arn:aws::iam::123456789012:role/
my-role
" ] }, "Resource": "arn:aws::secretsmanager:us-west-2
:123456789012:secret:my-secret
" } ] }
When you use Amazon VPC endpoints, AWS routes your API calls to invoke your function using the endpoint's Elastic Network Interface (ENI).
The Lambda service principal needs to call lambda:InvokeFunction
on any roles and functions that use those ENIs.
By default, Amazon VPC endpoints have open IAM policies that allow broad access to resources. Best practice is to restrict these
policies to perform the needed actions using that endpoint. To ensure that your event source mapping is able to invoke your Lambda
function, the VPC endpoint policy must allow the Lambda service principal to call sts:AssumeRole
and
lambda:InvokeFunction
. Restricting your VPC endpoint policies to allow only API calls originating within your organization
prevents the event source mapping from functioning properly, so "Resource": "*"
is required in these policies.
The following example VPC endpoint policies show how to grant the required access to the Lambda service principal for the AWS STS and Lambda endpoints.
Example VPC Endpoint policy — AWS STS endpoint
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
Example VPC Endpoint policy — Lambda endpoint
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }