

# Uso de Lambda con Apache Kafka autoadministrado
<a name="with-kafka"></a>

En este tema se describe cómo utilizar Lambda con un clúster de Kafka autoadministrado. En la terminología de AWS, un clúster autoadministrado incluye clústeres Kafka alojados que no son de AWS. Por ejemplo, puede alojar su clúster de Kafka con un proveedor de servicios en la nube, como [Confluent Cloud](https://www.confluent.io/confluent-cloud/) o [Redpanda](https://www.redpanda.com/).

En este capítulo se explica cómo utilizar un clúster de Apache Kafka autoadministrado como origen de eventos para su función de Lambda. El proceso general de integración de Apache Kafka autoadministrado con Lambda implica los siguientes pasos:

1. **[Configuración de clúster y red](with-kafka-cluster-network.md)**: primero, configure su clúster Apache Kafka autoadministrado con la configuración de red correcta para permitir que Lambda acceda a su clúster.

1. **[Configuración de la asignación de orígenes de eventos](with-kafka-configure.md)**: luego, cree el recurso de [asignación de orígenes de eventos](invocation-eventsourcemapping.md) que Lambda necesita para conectar de forma segura su clúster de Apache Kafka a su función.

1. **[Configuración de la función y los permisos](with-kafka-permissions.md)**: por último, asegúrese de que su función esté configurada correctamente y de que cuente con los permisos necesarios en su [rol de ejecución](lambda-intro-execution-role.md).

Apache Kafka como origen de eventos funciona de manera similar a utilizar Amazon Simple Queue Service (Amazon SQS) o Amazon Kinesis. Lambda sondea internamente nuevos mensajes del origen de eventos y luego invoca sincrónicamente la función de Lambda objetivo. Lambda lee los mensajes en lotes y los proporciona a su función como carga de eventos. El tamaño máximo del lote se puede configurar (el valor predeterminado son 100 mensajes). Para obtener más información, consulte [Comportamiento de procesamiento por lotes](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

Para optimizar el rendimiento de la asignación de orígenes de eventos de Apache Kafka autoadministrado, configure el modo aprovisionado. En el modo aprovisionado, puede definir el número mínimo y máximo de sondeos de eventos asignados a su asignación de orígenes de eventos. Esto puede mejorar la capacidad de la asignación de orígenes de eventos para manejar picos de mensajes inesperados. Para obtener más información, consulte [Modo aprovisionado](kafka-scaling-modes.md#kafka-provisioned-mode).

**aviso**  
Las asignaciones de orígenes de eventos de Lambda procesan cada evento al menos una vez, y puede producirse un procesamiento duplicado de registros. Para evitar posibles problemas relacionados con la duplicación de eventos, le recomendamos encarecidamente que haga que el código de la función sea idempotente. Para obtener más información, consulte [¿Cómo puedo hacer que mi función de Lambda sea idempotente?](https://repost.aws/knowledge-center/lambda-function-idempotent) en el Centro de conocimientos de AWS.

Para los orígenes de eventos basados en Kafka, Lambda admite parámetros de control de procesamiento, como los plazos de procesamiento por lotes y el tamaño del lote. Para obtener más información, consulte [Comportamiento de procesamiento por lotes](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

Para ver un ejemplo de cómo utilizar Kafka autoadministrado como origen de eventos, consulte [Uso de Apache Kafka autoalojado como origen de eventos para AWS Lambda](https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/) en el blog de informática de AWS.

**Topics**
+ [Evento de ejemplo](#smaa-sample-event)
+ [Configuración del clúster Apache Kafka autoadministrado y de la red para Lambda](with-kafka-cluster-network.md)
+ [Configuración de permisos del rol de ejecución de Lambda](with-kafka-permissions.md)
+ [Configuración de Apache Kafka autoadministrado como origen de eventos para Lambda](with-kafka-configure.md)

## Evento de ejemplo
<a name="smaa-sample-event"></a>

Lambda envía el lote de mensajes en el parámetro de evento cuando invoca su función de Lambda. La carga de eventos contiene una matriz de mensajes. Cada elemento de la matriz contiene detalles del tema Kafka y el identificador de partición Kafka, junto con una marca de tiempo y un mensaje codificado en base64.

```
{
   "eventSource": "SelfManagedKafka",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

# Configuración del clúster Apache Kafka autoadministrado y de la red para Lambda
<a name="with-kafka-cluster-network"></a>

Para conectar la función de Lambda al clúster de Apache Kafka autoagestionado, debe configurar correctamente tanto el clúster como la red en la que reside. En esta página se describe cómo configurar el clúster y la red. Si el clúster y la red ya están configurados correctamente, consulte [Configuración de Apache Kafka autoadministrado como origen de eventos para Lambda](with-kafka-configure.md) para configurar la asignación de orígenes de eventos.

**Topics**
+ [Configuración de clúster de Apache Kafka autoadministrado.](#kafka-cluster-setup)
+ [Configuración de la seguridad de la red](#services-kafka-vpc-config)

## Configuración de clúster de Apache Kafka autoadministrado.
<a name="kafka-cluster-setup"></a>

Puede alojar su clúster Apache Kafka autoadministrado con proveedores de nube como [Confluent Cloud](https://www.confluent.io/confluent-cloud/) o [Redpanda](https://www.redpanda.com/), o ejecutarlo en su propia infraestructura. Asegúrese de que el clúster esté correctamente configurado y sea accesible desde la red a la que se conectará la asignación de origen de eventos de Lambda.

## Configuración de la seguridad de la red
<a name="services-kafka-vpc-config"></a>

Para que Lambda tenga acceso completo a Apache Kafka autoadministrado a través de su asignación de orígenes de eventos, debe proporcionar acceso a la instancia de Amazon VPC en la que creó el clúster o este debe utilizar un punto de conexión público (dirección IP pública).

Cuando utilice Apache Kafka autoadministrado con Lambda, recomendamos que cree [puntos de conexión de VPC AWS PrivateLink](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html) que proporcionen a su función acceso a los recursos de su Amazon VPC.

**nota**  
Los puntos de conexión de VPC de AWS PrivateLink son necesarios para las funciones con asignaciones de orígenes de eventos que utilizan el modo predeterminado (bajo demanda) para los sondeos de eventos. Si la asignación de orígenes de eventos utiliza el [modo aprovisionado](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode), no es necesario configurar los puntos de conexión de VPC de AWS PrivateLink.

Cree un punto de conexión para proporcionar acceso a los siguientes recursos:
+  Lambda: cree un punto de conexión para la entidad principal del servicio de Lambda. 
+  AWS STS: cree un punto de conexión para AWS STS con el objetivo de que la entidad principal del servicio asuma un rol en su nombre. 
+  Secrets Manager: si el clúster usa Secrets Manager para almacenar las credenciales, cree un punto de conexión para Secrets Manager. 

Como alternativa, configure una puerta de enlace de NAT en cada subred pública de la Amazon VPC. Para obtener más información, consulte [Habilitación del acceso a Internet para funciones de Lambda conectadas a VPC](configuration-vpc-internet.md).

Al crear una asignación de orígenes de eventos para Apache Kafka autoadministrado, Lambda comprueba si las interfaces de red elásticas (ENI) ya están presentes en las subredes y los grupos de seguridad configurados para la Amazon VPC. Si Lambda encuentra ENI existentes, intenta reutilizarlos. De lo contrario, Lambda crea nuevos ENI para conectarse al origen de eventos e invocar la función.

**nota**  
Las funciones de Lambda siempre se ejecutan dentro de VPC propiedad del servicio de Lambda. La configuración de VPC de la función no afecta la asignación de orígenes de eventos. Solo la configuración de red del origen de eventos determina cómo se conecta Lambda al origen de eventos.

Configure los grupos de seguridad para la Amazon VPC que contiene el clúster. De forma predeterminada, Apache Kafka autoadministrado utiliza los siguientes puertos: `9092`.
+ Reglas de entrada: permiten todo el tráfico en el puerto del agente predeterminado para el grupo de seguridad asociado al origen de eventos. Como alternativa, puede usar una regla de grupo de seguridad con autorreferencia para permitir el acceso desde instancias que pertenecen al mismo grupo de seguridad.
+ Reglas de salida: permiten que todo el tráfico en el puerto `443` vaya a destinos externos en caso de que su función necesite comunicarse con servicios de AWS. Como alternativa, también puede usar una regla de grupo de seguridad con autorreferencia para limitar el acceso al agente en caso de que no necesite comunicarse con otros servicios de AWS.
+ Reglas de entrada del punto de conexión de Amazon VPC: si usa un punto de conexión de Amazon VPC, el grupo de seguridad asociado al punto de conexión de Amazon VPC debe permitir el tráfico entrante en el puerto `443` desde el grupo de seguridad del clúster.

Si el clúster utiliza la autenticación, también puede restringir la política del punto de conexión para el punto de conexión de Secrets Manager. Para llamar a la API de Secrets Manager, Lambda usa su rol de función, no la entidad principal de servicio de Lambda.

**Example Política de punto de conexión de VPC: punto de conexión de Secrets Manager**  

```
{
      "Statement": [
          {
              "Action": "secretsmanager:GetSecretValue",
              "Effect": "Allow",
              "Principal": {
                  "AWS": [
                      "arn:aws::iam::123456789012:role/my-role"
                  ]
              },
              "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
          }
      ]
  }
```

Cuando utiliza los puntos de conexión de VPC de Amazon, AWS enruta las llamadas a la API para invocar una función mediante la interfaz de red elástica (ENI) del punto de conexión. La entidad principal del servicio de Lambda debe llamar a `lambda:InvokeFunction` en cualquier rol y función que utilicen esas ENI.

De forma predeterminada, los puntos de conexión de VPC de Amazon tienen políticas de IAM abiertas que permiten un amplio acceso a los recursos. La práctica recomendada es restringir estas políticas para realizar las acciones necesarias mediante ese punto de conexión. Para garantizar que la asignación de orígenes de eventos pueda invocar la función de Lambda, la política de punto de conexión de VPC debe permitir que la entidad principal del servicio de Lambda llame a `sts:AssumeRole` y `lambda:InvokeFunction`. Restringir las políticas de punto de conexión de VPC para permitir únicamente las llamadas a la API que se originen en su organización impide que la asignación de orígenes de eventos funcione correctamente, por lo que en estas políticas es necesario `"Resource": "*"`.

En el siguiente ejemplo de políticas de puntos de conexión de VPC, se muestra cómo conceder el acceso necesario a las entidades principales del servicio de Lambda para AWS STS y los puntos de conexión de Lambda.

**Example Política de punto de conexión de VPC: punto de conexión de AWS STS**  

```
{
      "Statement": [
          {
              "Action": "sts:AssumeRole",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
    }
```

**Example Política de punto de conexión de VPC: punto de conexión de Lambda**  

```
{
      "Statement": [
          {
              "Action": "lambda:InvokeFunction",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
  }
```

# Configuración de permisos del rol de ejecución de Lambda
<a name="with-kafka-permissions"></a>

Además de [acceder al clúster de Kafka autoadministrado](kafka-cluster-auth.md), la función de Lambda necesita permisos para llevar a cabo varias acciones de la API. Los permisos se agregan al [rol de ejecución](lambda-intro-execution-role.md) de la función. Si los usuarios tienen que acceder a cualquier acción de la API, agregue los permisos necesarios a la política de identidad para el usuario o rol de AWS Identity and Access Management (IAM).

**Topics**
+ [Permisos de función de Lambda necesarios](#smaa-api-actions-required)
+ [Permisos de función de Lambda opcionales](#smaa-api-actions-optional)
+ [Adición de permisos a su rol de ejecución](#smaa-permissions-add-policy)
+ [Concesión de acceso a los usuarios con una política de IAM](#smaa-permissions-add-users)

## Permisos de función de Lambda necesarios
<a name="smaa-api-actions-required"></a>

Para crear y almacenar registros en un grupo de registros en Registros de Amazon CloudWatch, la función de Lambda debe tener los siguientes permisos en su rol de ejecución:
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [logs:PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

## Permisos de función de Lambda opcionales
<a name="smaa-api-actions-optional"></a>

Es posible que la función de Lambda también necesite permisos para:
+ Describir el secreto de Secrets Manager.
+ Acceder a su clave administrada por el cliente de AWS Key Management Service (AWS KMS).
+ Acceder a su Amazon VPC.
+ Envíe los registros de las invocaciones fallidas a un destino.

### Secrets Manager y permisos de AWS KMS
<a name="smaa-api-actions-secrets"></a>

En función del tipo de control de acceso que configure para los agentes de Kafka, es posible que la función de Lambda necesite permiso para acceder a su secreto de Secrets Manager o para descifrar su clave administrada por el cliente de AWS KMS. Para acceder a estos recursos, el rol de ejecución de la función debe tener los siguientes permisos:
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html)
+ [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html)

### Permisos de VPC
<a name="smaa-api-actions-vpc"></a>

Si solo los usuarios de una VPC pueden acceder a su clúster de Apache Kafka autoadministrado, su función de Lambda debe tener permiso para acceder a sus recursos de Amazon VPC. Estos recursos incluyen su VPC, subredes, grupos de seguridad e interfaces de red. Para acceder a estos recursos, el rol de ejecución de la función debe tener los siguientes permisos:
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)

## Adición de permisos a su rol de ejecución
<a name="smaa-permissions-add-policy"></a>

Para tener acceso a otros servicios de AWS que su clúster Apache Kafka autoadministrado utiliza, Lambda utiliza las políticas de permisos que defina en el [rol de ejecución](lambda-intro-execution-role.md) de la función de Lambda.

De forma predeterminada, Lambda no está permitido realizar las acciones necesarias u opcionales para un clúster Apache Kafka autoadministrado. Debe crear y definir estas acciones en una [política de confianza de IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_update-role-trust-policy.html) para su rol de ejecución. En este ejemplo se muestra cómo puede crear una política que permita a Lambda tener acceso a los recursos de Amazon VPC.

------
#### [ JSON ]

****  

```
{
        "Version":"2012-10-17",		 	 	 
        "Statement":[
           {
              "Effect":"Allow",
              "Action":[
                 "ec2:CreateNetworkInterface",
                 "ec2:DescribeNetworkInterfaces",
                 "ec2:DescribeVpcs",
                 "ec2:DeleteNetworkInterface",
                 "ec2:DescribeSubnets",
                 "ec2:DescribeSecurityGroups"
              ],
              "Resource":"*"
           }
        ]
     }
```

------

## Concesión de acceso a los usuarios con una política de IAM
<a name="smaa-permissions-add-users"></a>

De forma predeterminada, los usuarios y roles no tienen permiso para llevar a cabo [operaciones de API de origen de eventos](invocation-eventsourcemapping.md#event-source-mapping-api). Para conceder acceso a los usuarios de su organización o cuenta, cree o actualice la política basada en identidades. Para obtener más información, consulte [Control del acceso a los recursos de AWS mediante políticas](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_controlling.html) en la *Guía del usuario de IAM*.

Para la solución de problemas de errores de autenticación y autorización, consulte [Solución de errores de asignación de orígenes de eventos de Kafka](with-kafka-troubleshoot.md).

# Configuración de Apache Kafka autoadministrado como origen de eventos para Lambda
<a name="with-kafka-configure"></a>

Para usar un clúster de Apache Kafka autoadministrado como origen de eventos para su función de Lambda, cree una [asignación de orígenes de eventos](invocation-eventsourcemapping.md) que conecte los dos recursos. Esta página describe cómo crear una asignación de orígenes de eventos para Apache Kafka autoadministrado.

Esta página asume que ya ha configurado correctamente su clúster de Kafka y la red en la que reside. Si necesita configurar el clúster o la red, consulte [Configuración del clúster Apache Kafka autoadministrado y de la red para Lambda](with-kafka-cluster-network.md).

**Topics**
+ [Uso de un clúster de Apache Kafka autoadministrado como origen de eventos.](#kafka-esm-overview)
+ [Configuración de métodos de autenticación de clústeres en Lambda](kafka-cluster-auth.md)
+ [Creación de una asignación de orígenes de eventos de Lambda para un origen de eventos de Apache Kafka autoadministrado](kafka-esm-create.md)
+ [Todos los parámetros de configuración de orígenes de eventos de Apache Kafka autoadministrado en Lambda](kafka-esm-parameters.md)

## Uso de un clúster de Apache Kafka autoadministrado como origen de eventos.
<a name="kafka-esm-overview"></a>

Cuando agrega su clúster de Apache Kafka o Amazon MSK como desencadenador para su función de Lambda, el clúster se utiliza como [origen de eventos](invocation-eventsourcemapping.md).

Lambda lee los datos de eventos de los temas de Kafka que especifique como `Topics` en una solicitud de [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html), en función de la [posición inicial](kafka-starting-positions.md) que especifique. Después de un procesamiento exitoso, su tema de Kafka se compromete a su clúster de Kafka.

Lambda lee los mensajes secuencialmente para cada partición de tema de Kafka. Una sola carga de Lambda puede contener mensajes de varias particiones. Cuando hay más registros disponibles, Lambda continúa procesando registros en lotes, en función del valor de BatchSize que especifique en una solicitud de [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html), hasta que la función se ponga al día con el tema.

Después de que Lambda procese cada lote, confirma los desplazamientos de los mensajes en ese lote. Si su función devuelve un error para cualquiera de los mensajes de un lote, Lambda reintenta todo el lote de mensajes hasta que el procesamiento sea correcto o los mensajes caduquen. Puede enviar los registros con error en todos los reintentos a un destino en caso de error para su posterior procesamiento.

**nota**  
Si bien las funciones de Lambda suelen tener un límite de tiempo de espera máximo de 15 minutos, las asignaciones de orígenes de eventos para Amazon MSK, Apache Kafka autoadministrado, Amazon DocumentDB y Amazon MQ para ActiveMQ y RabbitMQ solo admiten funciones con límites de tiempo de espera máximos de 14 minutos.

# Configuración de métodos de autenticación de clústeres en Lambda
<a name="kafka-cluster-auth"></a>

Lambda admite varios métodos para autenticarse con su clúster de Apache Kafka autoadministrado. Asegúrese de configurar el clúster de Kafka para que utilice uno de estos métodos de autenticación admitidos: Para obtener más información acerca de la seguridad de Kafka, consulte la sección [Security](http://kafka.apache.org/documentation.html#security) (Seguridad) de la documentación de Kafka.

## Autenticación SASL/SCRAM
<a name="smaa-auth-sasl"></a>

Lambda es compatible con la autenticación simple y la autenticación de capa de seguridad/mecanismo de autenticación de respuesta por desafío saltado (SASL/SCRAM) con cifrado de seguridad de la capa de transporte (TLS) (`SASL_SSL`). Lambda envía las credenciales cifradas para autenticarse con el clúster. Lambda no es compatible con SASL/SCRAM con texto simple (`SASL_PLAINTEXT`). Para obtener más información acerca de la autenticación SASL/SCRAM, consulte [RFC 5802](https://tools.ietf.org/html/rfc5802).

Lambda también admite la autenticación SASL/PLAIN. Dado que este mecanismo utiliza credenciales en texto claro, la conexión con el servidor debe utilizar cifrado TLS para garantizar la protección de las credenciales.

Para la autenticación SASL, almacene las credenciales de inicio de sesión como secreto en AWS Secrets Manager. Para obtener más información acerca de cómo utilizar Secrets Manager, consulte [Creación de un secreto de AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html) en la *Guía del usuario de AWS Secrets Manager*.

**importante**  
Para utilizar Secrets Manager para la autenticación, los secretos deben almacenarse en la misma región de AWS que la función de Lambda.

## Autenticación TLS mutua
<a name="smaa-auth-mtls"></a>

TLS mutua (mTLS) proporciona autenticación bidireccional entre el cliente y el servidor. El cliente envía un certificado al servidor para que el servidor verifique el cliente, mientras que el servidor envía un certificado al cliente para que el cliente verifique el servidor. 

En Apache Kafka autoadministrado, Lambda actúa como cliente. Puede configurar un certificado de cliente (como secreto en Secrets Manager) para autenticar a Lambda con los agentes de Kafka. El certificado de cliente debe estar firmado por una entidad de certificación en el almacén de confianza del servidor.

El clúster de Kafka envía un certificado de servidor a Lambda para autenticar a los agentes de Kafka con Lambda. El certificado de servidor puede ser un certificado de entidad de certificación pública o un certificado autofirmado o de entidad de certificación privada. El certificado de entidad de certificación pública debe estar firmado por una entidad de certificación que esté en el almacén de confianza de Lambda. Para un certificado autofirmado o de entidad de certificación privada, configure el certificado de entidad de certificación raíz del servidor (como secreto en Secrets Manager). Lambda utiliza el certificado raíz para verificar los agentes de Kafka.

Para obtener más información acerca de mTLS, consulte [Introducing mutual TLS authentication for Amazon MSK as an event source](https://aws.amazon.com/blogs/compute/introducing-mutual-tls-authentication-for-amazon-msk-as-an-event-source) (Presentación de la autenticación de TLS mutua para Amazon MSK como origen de eventos).

## Configuración del secreto de certificado de cliente
<a name="smaa-auth-secret"></a>

El secreto CLIENT\$1CERTIFICATE\$1TLS\$1AUTH requiere un campo de certificado y un campo de clave privada. Para una clave privada cifrada, el secreto requiere una contraseña de clave privada. El certificado y la clave privada deben estar en formato PEM.

**nota**  
Lambda admite los algoritmos de cifrado de claves privadas [PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (pero no PBES2).

El campo de certificado debe contener una lista de certificados y debe comenzar por el certificado de cliente, seguido de cualquier certificado intermedio, y finalizar con el certificado raíz. Cada certificado debe comenzar en una nueva línea con la siguiente estructura:

```
-----BEGIN CERTIFICATE-----  
            <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager admite secretos de hasta 65 536 bytes, que supone suficiente espacio para cadenas de certificados largas.

El formato de la clave privada debe ser [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208), con la siguiente estructura:

```
-----BEGIN PRIVATE KEY-----  
             <private key contents>
-----END PRIVATE KEY-----
```

Para una clave privada cifrada, utilice la siguiente estructura:

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
              <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

El siguiente ejemplo muestra el contenido de un secreto para la autenticación de mTLS mediante una clave privada cifrada. Para una clave privada cifrada, incluya la contraseña de la clave privada en el secreto.

```
{"privateKeyPassword":"testpassword",
"certificate":"-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
"privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

## Configuración del secreto de certificado de entidad de certificación raíz del servidor
<a name="smaa-auth-ca-cert"></a>

Cree este secreto si sus agentes de Kafka utilizan cifrado TLS con certificados firmados por una entidad de certificación privada. Puede utilizar el cifrado TLS para autenticación VPC, SASL/SCRAM, SASL/PLAIN o mTLS.

El secreto de certificado de entidad de certificación raíz del servidor requiere un campo que contenga el certificado de entidad de certificación raíz del agente de Kafka en formato PEM. La estructura del secreto se muestra en el ejemplo siguiente.

```
{"certificate":"-----BEGIN CERTIFICATE-----
MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx
EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT
HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs
ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG...
-----END CERTIFICATE-----"
}
```

# Creación de una asignación de orígenes de eventos de Lambda para un origen de eventos de Apache Kafka autoadministrado
<a name="kafka-esm-create"></a>

Para crear una asignación de orígenes de eventos, puede usar la consola de Lambda, la [AWS Command Line Interface (CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) o un [AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/).

Los siguientes pasos de la consola agregan un clúster de Apache Kafka autoadministrado como desencadenador para su función de Lambda. Internamente, esto crea un recurso de asignación de orígenes de eventos.

## Requisitos previos
<a name="kafka-esm-prereqs"></a>
+ Un clúster de Apache Kafka autoadministrado. Lambda es compatible con Apache Kafka 0.10.1.0 y versiones posteriores.
+ Un [rol de ejecución](lambda-intro-execution-role.md) con permiso para acceder a los recursos de AWS que utiliza el clúster de Kafka autoadministrado.

## Agregar un clúster de Kafka autoadministrado (consola)
<a name="kafka-esm-console"></a>

Siga estos pasos para agregar su clúster Apache Kafka autoadministrado y un tema Kafka como desencadenador de su función de Lambda.

**Para agregar un desencadenador de Apache Kafka a su función de Lambda (consola)**

1. .Abra la página de [Functions (Funciones)](https://console.aws.amazon.com/lambda/home#/functions) en la consola de Lambda.

1. Elija el nombre de su función de Lambda.

1. En **Descripción general de la función**, elija **Agregar desencadenador**.

1. En **Configuración del desencadenador**, haga lo siguiente:

   1. Elija el tipo de desencadenador **Apache Kafka**.

   1. Para los **servidores de Bootstrap**, ingrese la dirección de host y par de puertos de un broker de Kafka en su clúster y, a continuación, elija **Add (Agregar)**. Repita para cada broker de Kafka en el clúster.

   1. Para el **nombre del tema**, escriba el nombre del tema Kafka utilizado para almacenar registros en el clúster.

   1. Si configura el modo aprovisionado, ingrese un valor para **Sondeadores de eventos mínimos**, un valor para **Sondeadores de eventos máximos** y un valor opcional para PollerGroupName para especificar la agrupación de varias ESM dentro de la misma VPC de origen de eventos.

   1. (Opcional) Para **Tamaño del lote**, introduzca el número máximo de registros que se recibirán en un solo lote.

   1. Para el **periodo de lotes**, ingrese la cantidad máxima de segundos que Lambda emplea a fin de recopilar registros antes de invocar la función.

   1. (Opcional) Para el **ID del grupo de consumidores**, ingrese el ID de un grupo de consumidores de Kafka al que unirse.

   1. (Opcional) En **Posición inicial**, elija **Última** para empezar a leer el flujo desde el registro más reciente, **Horizonte de supresión** para comenzar por el registro más antiguo disponible o **En la marca de tiempo** para especificar una marca de tiempo desde la cual comenzar a leer.

   1. (Opcional) Para **VPC**, elija Amazon VPC para su clúster de Kafka. A continuación, elija **VPC subnets** (Subredes de VPC) y **VPC security groups** (Grupos de seguridad de VPC).

      Esta configuración es obligatoria si solo los usuarios de la VPC acceden a los agentes.

      

   1. (Opcional) Para **Authentication** (Autenticación), elija **Add** (Agregar) y, a continuación, haga lo siguiente:

      1. Elija el protocolo de acceso o autenticación de los agentes de Kafka en su clúster.
         + Si su agente de Kafka utiliza autenticación SASL/PLAIN, elija **BASIC\$1AUTH**.
         + Si su agente utiliza autenticación de SASL/SCRAM, elija uno de los protocolos de **SASL\$1SCRAM**.
         + Si configura la autenticación de mTLS, elija el protocolo **CLIENT\$1CERTIFICATE\$1TLS\$1AUTH**.

      1. Para la autenticación de SASL/SCRAM o mTLS, elija la clave secreta de Secrets Manager que contiene las credenciales del clúster de Kafka.

   1. (Opcional) Para **Encryption** (Cifrado), elija el secreto de Secrets Manager que contiene el certificado de entidad de certificación raíz que los agentes de Kafka utilizan para el cifrado con TLS, si los agentes de Kafka utilizan certificados firmados por una entidad de certificación privada.

      Esta configuración se aplica al cifrado con TLS para SASL/SCRAM o SASL/PLAIN y a la autenticación con mTLS.

   1. Para crear el desencadenador en un estado deshabilitado para la prueba (recomendado), desactive **Activar desencadenador**. O bien, para habilitar el desencadenador de inmediato, seleccione**Activar desencadenador**.

1. Para crear el desencadenador, elija **Add** (Añadir).

## Agregar un clúster Kafka autoadministrado (AWS CLI)
<a name="kafka-esm-cli"></a>

Utilice los siguientes AWS CLI comandos de ejemplo para crear y ver un desencadenador Apache Kafka autoadminstrado para su función de Lambda.

### Uso de SASL/SCRAM
<a name="kafka-esm-cli-create"></a>

Si los usuarios de Kafka acceden a los agentes de Kafka a través de Internet, especifique el secreto de Secrets Manager que creó para la autenticación con SASL/SCRAM. En el siguiente ejemplo se utiliza el comando [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) de la AWS CLI para asignar una función de Lambda llamada `my-kafka-function` a un tema de Kafka llamado `AWSKafkaTopic`.

```
aws lambda create-event-source-mapping \ 
  --topics AWSKafkaTopic \
  --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \
  --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
```

### Uso de una VPC
<a name="kafka-esm-cli-create-vpc"></a>

Si solo los usuarios de Kafka de su VPC acceden a sus agentes de Kafka, debe especificar su VPC, subredes y grupo de seguridad de VPC. En el siguiente ejemplo se utiliza el comando [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) de la AWS CLI para asignar una función de Lambda llamada `my-kafka-function` a un tema de Kafka llamado `AWSKafkaTopic`.

```
aws lambda create-event-source-mapping \ 
  --topics AWSKafkaTopic \
  --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \
  --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
```

### Visualización del estado mediante la AWS CLI
<a name="kafka-esm-cli-view"></a>

En el siguiente ejemplo se utiliza el comando [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) de la AWS CLI para describir el estado de la asignación de orígenes de eventos que ha creado.

```
aws lambda get-event-source-mapping
              --uuid dh38738e-992b-343a-1077-3478934hjkfd7
```

# Todos los parámetros de configuración de orígenes de eventos de Apache Kafka autoadministrado en Lambda
<a name="kafka-esm-parameters"></a>

Todos los tipos de origen de eventos Lambda comparten las mismas operaciones [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) y [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) de la API. Sin embargo, solo algunos de los parámetros se aplican a Apache Kafka, como se muestra en la siguiente tabla.


| Parámetro | Obligatorio | Predeterminado | Notas | 
| --- | --- | --- | --- | 
|  BatchSize  |  N  |  100  |  Máximo: 10 000  | 
|  DestinationConfig  |  N  |  N/A  |  [Captura de lotes descartados para orígenes de eventos de Amazon MSK y Apache Kafka autoadministrado](kafka-on-failure.md)  | 
|  Habilitado  |  N  |  True  |  | 
|  FilterCriteria  |  N  |  N/A  |  [Controle qué eventos envía Lambda a la función](invocation-eventfiltering.md)  | 
|  FunctionName  |  S  |  N/A  |    | 
|  KMSKeyArn  |  N  |  N/A  |  [Cifrado de los criterios de filtro](invocation-eventfiltering.md#filter-criteria-encryption)  | 
|  MaximumBatchingWindowInSeconds  |  N  |  500 ms  |  [Comportamiento de procesamiento por lotes](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  ProvisionedPollersConfig  |  N  |  `MinimumPollers`: si no se especifica, el valor predeterminado es 1. `MaximumPollers`: si no se especifica, el valor predeterminado es 200. `PollerGroupName`: N/A  |  [Modo aprovisionado](kafka-scaling-modes.md#kafka-provisioned-mode)  | 
|  SelfManagedEventSource  |  S  | N/A |  Lista de agentes de Kafka. Solo se puede establecer en Crear  | 
|  SelfManagedKafkaEventSourceConfig  |  N  |  Contiene el campo ConsumerGroupId, que se establece de forma predeterminada en un valor único.  |  Solo se puede establecer en Crear  | 
|  SourceAccessConfigurations  |  N  |  Sin credenciales  |  Información de VPC o credenciales de autenticación para el clúster   Para SASL\$1PLAIN, establezca en BASIC\$1AUTH  | 
|  StartingPosition  |  S  |  N/A  |  AT\$1TIMESTAMP, TRIM\$1HORIZON o LATEST Solo se puede establecer en Crear  | 
|  StartingPositionTimestamp  |  N  |  N/A  |  Obligatorio si StartingPosition se establece en AT\$1TIMESTAMP  | 
|  Etiquetas  |  N  |  N/A  |  [Uso de etiquetas en asignaciones de orígenes de eventos](tags-esm.md)  | 
|  Temas  |  S  |  N/A  |  Nombre del tema Solo se puede establecer en Crear  | 

**nota**  
Cuando especifica un `PollerGroupName`, varias ESM de la misma Amazon VPC pueden compartir la capacidad de la unidad de sondeador de eventos (EPU). Puede utilizar esta opción para optimizar los costos del modo aprovisionado para sus ESM. Requisitos para la agrupación de ESM:  
Las ESM deben estar dentro de la misma Amazon VPC.
Máximo de 100 ESM por grupo de sondeadores
El número máximo total de sondeadores en todos las ESM de un grupo no puede superar los 2000.
Puede actualizar el `PollerGroupName` para mover una ESM a un grupo diferente o eliminar una ESM de un grupo; para ello, configure una string vacía («») en `PollerGroupName`: