Procesamiento de mensajes de Amazon MSK con Lambda
nota
Si desea enviar datos a un destino que no sea una función de Lambda o enriquecer los datos antes de enviarlos, consulte Amazon EventBridge Pipes (Canalizaciones de Amazon EventBridge).
Temas
- Agregar Amazon MSK como origen de eventos
- Parámetros de configuración de Amazon MSK
- Creación de asignaciones de orígenes de eventos entre cuentas
- Utilizar un clúster de Amazon MSK como origen de eventos
- Posiciones iniciales de flujos y sondeo
- Métricas de Amazon CloudWatch
- Comportamiento de escalado del rendimiento de mensajes para asignación de orígenes de eventos de Amazon MSK
Agregar Amazon MSK como origen de eventos
Para crear una asignación de orígenes de eventos, agregue Amazon MSK como un desencadenador de la función de Lambda a través de la consola de Lambda, un AWS SDK
En esta sección se describe cómo crear una asignación de orígenes de eventos mediante la consola de Lambda y la AWS CLI.
Requisitos previos
-
Un clúster Amazon MSK y un tema Kafka. Para obtener más información, consulte Introducción al uso de Amazon MSK en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.
-
Un rol de ejecución con permiso para acceder a los recursos de AWS que utiliza el clúster de MSK.
ID del grupo de consumidores personalizable
Al configurar Kafka como origen de eventos, puede especificar un ID de grupo de consumidores. Este ID de grupo de consumidores es un identificador existente para el grupo de consumidores de Kafka al que desea que se una la función de Lambda. Puede utilizar esta característica para migrar sin problemas cualquier configuración de procesamiento de registro de Kafka en curso de otros consumidores a Lambda.
Si especifica un ID de grupo de consumidores y hay otros sondeadores activos dentro de ese grupo de consumidores, Kafka distribuirá los mensajes entre todos los consumidores. En otras palabras, Lambda no recibe todos los mensajes del tema Kafka. Si desea que Lambda gestione todos los mensajes del tema, desactive los demás sondeadores de ese grupo de consumidores.
Además, si especifica un ID de grupo de consumidores y Kafka encuentra un grupo de consumidores existente válido con el mismo ID, Lambda ignora el parámetro StartingPosition
para la asignación de orígenes de eventos. En cambio, Lambda comienza a procesar los registros de acuerdo con la compensación comprometida del grupo de consumidores. Si especifica un ID de grupo de consumidores y Kafka no puede encontrar un grupo de consumidores existente, Lambda configura el origen de eventos con el StartingPosition
especificado.
El ID del grupo de consumidores que especifique debe ser único entre todos los orígenes de eventos de Kafka. Tras crear una asignación de origen de eventos de Kafka con el ID de grupo de consumidores especificado, no puede actualizar este valor.
Agregar un desencadenador de Amazon MSK (consola)
Siga estos pasos para agregar su clúster de Amazon MSK y un tema Kafka como desencadenador de su función de la función de Lambda.
Cómo agregar un desencadenador Amazon MSK a su función de Lambda (consola)
-
Abra la página de Funciones
en la consola de Lambda. -
Elija el nombre de su función de Lambda.
-
En Descripción general de la función, elija Agregar desencadenador.
-
En Configuración del desencadenador, haga lo siguiente:
-
Elija el tipo de desencadenador MSK.
-
Para clúster de MSK, seleccione su clúster.
-
Para el Tamaño del lote, establezca el número máximo de mensajes para recibir un solo lote.
-
Para el periodo de lotes, ingrese la cantidad máxima de segundos que Lambda emplea a fin de recopilar registros antes de invocar la función.
-
Para Nombre de tema, escriba un nombre para el tema Kafka.
-
(Opcional) Para el ID del grupo de consumidores, ingrese el ID de un grupo de consumidores de Kafka al que unirse.
-
(Opcional) En Posición inicial, elija Última para empezar a leer el flujo desde el registro más reciente, Horizonte de supresión para comenzar por el registro más antiguo disponible o En la marca de tiempo para especificar una marca de tiempo desde la cual comenzar a leer.
-
(Opcional) En Authentication (Autenticación), elija la clave secreta para la autenticación con los agentes en el clúster de MSK.
-
Para crear el desencadenador en un estado deshabilitado para la prueba (recomendado), desactive Activar desencadenador. O bien, para habilitar el desencadenador de inmediato, seleccioneActivar desencadenador.
-
-
Para crear el desencadenador, elija Add (Añadir).
Agregar un desencadenador de Amazon MSK (AWS CLI)
Utilice los siguientes comandos AWS CLI de ejemplo para crear y ver un desencadenador Amazon MSK para su función de a función Lambda.
Crear un desencadenador mediante el AWS CLI
ejemplo — Creación de una asignación de orígenes de eventos para un clúster que utilice la autenticación de IAM
En el siguiente ejemplo se utiliza el comando create-event-source-mappingmy-kafka-function
a un tema de Kafka llamado AWSKafkaTopic
. La posición inicial del tema está establecida en LATEST
. Cuando el clúster utiliza la autenticación basada en roles de IAM, no se necesita un objeto SourceAccessConfiguration. Ejemplo:
aws lambda create-event-source-mapping \ --event-source-arn
arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2
\ --topicsAWSKafkaTopic
\ --starting-positionLATEST
\ --function-namemy-kafka-function
ejemplo — Creación de una asignación de orígenes de eventos para un clúster que utilice la autenticación SASL/SCRAM
Si el clúster usa la autenticación SASL/SCRAM, debe incluir un objeto SourceAccessConfiguration que especifique SASL_SCRAM_512_AUTH
y un ARN secreto de Secrets Manager.
aws lambda create-event-source-mapping \ --event-source-arn
arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2
\ --topicsAWSKafkaTopic
\ --starting-positionLATEST
\ --function-namemy-kafka-function
--source-access-configurations'[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
ejemplo — Creación de una asignación de orígenes de eventos para un clúster que utilice la autenticación mTLS
Si el clúster usa la autenticación mTLS, debe incluir un objeto SourceAccessConfiguration que especifique CLIENT_CERTIFICATE_TLS_AUTH
y un ARN secreto de Secrets Manager.
aws lambda create-event-source-mapping \ --event-source-arn
arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2
\ --topicsAWSKafkaTopic
\ --starting-positionLATEST
\ --function-namemy-kafka-function
--source-access-configurations'[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
Para obtener más información, consulte la documentación de referencia de la API CreateEventSourceMapping.
Visualización del estado mediante la AWS CLI
En el siguiente ejemplo se utiliza el comando get-event-source-mapping
aws lambda get-event-source-mapping \ --uuid
6d9bce8e-836b-442c-8070-74e77903c815
Parámetros de configuración de Amazon MSK
Todos los tipos de fuente de eventos Lambda comparten las mismas operaciones CreateEventSourceMapping y UpdateEventSourceMapping de la API. Sin embargo, solo algunos de los parámetros se aplican a Amazon MSK.
Parámetro | Obligatoria | Predeterminado | Notas |
---|---|---|---|
AmazonManagedKafkaEventSourceConfig |
N |
Contiene el campo ConsumerGroupId, que se establece de forma predeterminada en un valor único. |
Solo se puede establecer en Crear |
BatchSize |
N |
100 |
Máximo: 10 000 |
DestinationConfig |
N |
N/A |
Capturar lotes descartados para un origen de eventos de Amazon MSK |
Habilitado |
N |
True |
|
EventSourceArn |
Y |
N/A |
Solo se puede establecer en Crear |
FilterCriteria |
N |
N/A |
|
FunctionName |
Y |
N/A |
|
KMSKeyArn |
N |
N/A |
|
MaximumBatchingWindowInSeconds |
N |
500 ms |
|
ProvisionedPollersConfig |
N |
|
|
SourceAccessConfigurations |
N |
Sin credenciales |
Credenciales de autenticación de SASL/SCRAM o CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) para el origen de eventos |
StartingPosition |
Y |
N/A |
AT_TIMESTAMP, TRIM_HORIZON o LATEST Solo se puede establecer en Crear |
StartingPositionTimestamp |
N |
N/A |
Obligatorio si StartingPosition se establece en AT_TIMESTAMP |
Etiquetas |
N |
N/A |
|
Temas |
Y |
N/A |
Nombre del tema de Kafka Solo se puede establecer en Crear |
Creación de asignaciones de orígenes de eventos entre cuentas
Puede utilizar la conectividad privada de varias VPC para conectar una función de Lambda a un clúster de MSK aprovisionado en otra Cuenta de AWS. La conectividad de varias VPC utiliza AWS PrivateLink, que mantiene todo el tráfico dentro de la red de AWS.
nota
No puede crear asignaciones de orígenes de eventos entre cuentas para clústeres de MSK sin servidor.
Para crear una asignación de orígenes de eventos entre cuentas, primero debe configurar la conectividad de múltiples VPC para el clúster de MSK. Al crear la asignación de orígenes de eventos, utilice el ARN de conexión de VPC administrada en lugar del ARN del clúster, como se muestra en los siguientes ejemplos. La operación CreateEventSourceMapping también varía según el tipo de autenticación que utilice el clúster de MSK.
ejemplo — Creación de una asignación de orígenes de eventos entre cuentas para un clúster que utilice la autenticación de IAM
Cuando el clúster utiliza la autenticación basada en roles de IAM, no se necesita un objeto SourceAccessConfiguration. Ejemplo:
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:
us-east-1:111122223333
:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7
\ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
ejemplo — Creación de una asignación de orígenes de eventos entre cuentas para un clúster que utilice la autenticación SASL/SCRAM
Si el clúster usa la autenticación SASL/SCRAM, debe incluir un objeto SourceAccessConfiguration que especifique SASL_SCRAM_512_AUTH
y un ARN secreto de Secrets Manager.
Hay dos formas de utilizar los secretos para las asignaciones de orígenes de eventos entre cuentas de Amazon MSK con autenticación SASL/SCRAM:
-
Cree un secreto en la cuenta de la función de Lambda y sincronícelo con el secreto del clúster. Cree una rotación para mantener los dos secretos sincronizados. Esta opción le permite controlar el secreto desde la cuenta de la función.
-
Use el secreto asociado al clúster de MSK. Este secreto debe permitir el acceso entre cuentas a la cuenta de la función de Lambda. Para obtener más información, consulte Permisos para secretos de AWS Secrets Manager para usuarios en una cuenta diferente.
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:
us-east-1:111122223333
:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7
\ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations'[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
ejemplo — Creación de una asignación de orígenes de eventos entre cuentas para un clúster que utilice la autenticación mTLS
Si el clúster usa la autenticación mTLS, debe incluir un objeto SourceAccessConfiguration que especifique CLIENT_CERTIFICATE_TLS_AUTH
y un ARN secreto de Secrets Manager. El secreto se puede almacenar en la cuenta del clúster o en la cuenta de la función de Lambda.
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:
us-east-1:111122223333
:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7
\ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations'[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
Utilizar un clúster de Amazon MSK como origen de eventos
Cuando agrega su clúster de Apache Kafka o Amazon MSK como desencadenador para su función de Lambda, el clúster se utiliza como origen de eventos.
Lambda lee los datos de eventos de los temas de Kafka que especifique como Topics
en una solicitud de CreateEventSourceMapping, en función de la StartingPosition
que especifique. Después de un procesamiento exitoso, su tema de Kafka se compromete a su clúster de Kafka.
Si especifica la StartingPosition
como LATEST
, Lambda comienza a leer el último mensaje de cada partición que pertenece al tema. Debido a que puede haber algún retraso después de la configuración del desencadenador antes de que Lambda comience a leer los mensajes, Lambda no lee ningún mensaje producido durante este plazo.
Lambda lee los mensajes secuencialmente para cada partición de tema de Kafka. Una sola carga de Lambda puede contener mensajes de varias particiones. Cuando hay más registros disponibles, Lambda continúa procesando registros en lotes, en función del valor BatchSize
que especifique en una solicitud de CreateEventSourceMapping, hasta que la función se ponga al día con el tema.
Después de que Lambda procese cada lote, confirma los desplazamientos de los mensajes en ese lote. Si su función devuelve un error para cualquiera de los mensajes de un lote, Lambda reintenta todo el lote de mensajes hasta que el procesamiento sea correcto o los mensajes caduquen. Puede enviar los registros con error en todos los reintentos a un destino en caso de error para su posterior procesamiento.
nota
Si bien las funciones de Lambda suelen tener un límite de tiempo de espera máximo de 15 minutos, las asignaciones de orígenes de eventos para Amazon MSK, Apache Kafka autoadministrado, Amazon DocumentDB y Amazon MQ para ActiveMQ y RabbitMQ solo admiten funciones con límites de tiempo de espera máximos de 14 minutos. Esta restricción garantiza que la asignación de orígenes de eventos pueda gestionar correctamente los errores y reintentos de las funciones.
Posiciones iniciales de flujos y sondeo
Tenga en cuenta que el sondeo de flujos durante la creación y las actualizaciones de la asignación de orígenes de eventos es, en última instancia, coherente.
-
Durante la creación de la asignación de orígenes de eventos, es posible que se demore varios minutos en iniciar el sondeo de los eventos del flujo.
-
Durante las actualizaciones de la asignación de orígenes de eventos, es posible que se demore varios minutos en detener y reiniciar el sondeo de los eventos del flujo.
Este comportamiento significa que, si especifica LATEST
como posición inicial del flujo, la asignación de orígenes de eventos podría omitir eventos durante la creación o las actualizaciones. Para garantizar que no se pierda ningún evento, especifique la posición inicial del flujo como TRIM_HORIZON
o AT_TIMESTAMP
.
Métricas de Amazon CloudWatch
Lambda emite la métrica OffsetLag
mientras la función procesa los registros. El valor de esta métrica es la diferencia de compensación entre el último registro escrito en el tema de origen de eventos de Kafka y el último registro que procesó el grupo de consumidores de su función. Puede utilizar OffsetLag
para estimar la latencia entre el momento en el que se agrega un registro y el momento en el que el grupo lo procesa.
Una tendencia ascendente en OffsetLag
puede indicar problemas con los sondeadores en el grupo de consumidores de su función. Para obtener más información, consulte Uso de métricas de CloudWatch con Lambda.
Comportamiento de escalado del rendimiento de mensajes para asignación de orígenes de eventos de Amazon MSK
Puede elegir entre dos modos de comportamiento de escalado del rendimiento de mensajes para su asignación de orígenes de eventos de Amazon MSK:
Modo predeterminado (bajo demanda)
Cuando crea inicialmente un origen de eventos de Amazon MSK, Lambda asigna una cantidad predeterminada de sondeos de eventos para procesar todas las particiones en el tema de Kafka. Lambda reduce o escala verticalmente de forma automática el número de sondeos de eventos en función de la carga de mensajes.
En intervalos de un minuto, Lambda evalúa el retraso de inicio de todas las particiones del tema. Si el retraso es demasiado alto, la partición recibe mensajes más rápido de lo que Lambda puede procesarlos. Si es necesario, Lambda agrega o elimina a los sondeos de eventos del tema. Este proceso de escalado automático para agregar o eliminar sondeos de eventos se produce dentro de los tres minutos posteriores a la evaluación.
Si su función de Lambda de destino está limitada, Lambda reduce la cantidad de sondeos de eventos. Esta acción reduce la carga de trabajo de la función al reducir el número de mensajes que los sondeos de eventos pueden recuperar y enviar a la función.
Configuración del modo aprovisionado
Para las cargas de trabajo en las que necesite afinar el rendimiento de su asignación de orígenes de eventos, puede utilizar el modo aprovisionado. En el modo aprovisionado, usted define los límites mínimos y máximos para la cantidad de sondeos de eventos aprovisionados. Estos sondeos de eventos aprovisionados están dedicados a la asignación de orígenes de eventos y pueden gestionar los picos de mensajes inesperados mediante un ajuste de escalado automático adaptable. Recomendamos que utilice el modo aprovisionado para las cargas de trabajo de Kafka que tengan requisitos de rendimiento estrictos.
En Lambda, un sondeo de eventos es una unidad de cómputo capaz de gestionar hasta 5 MBps de rendimiento. Como referencia, supongamos que el origen de eventos produce una carga útil media de 1 MB y que la duración media de la función es de 1 segundo. Si la carga útil no sufre ninguna transformación (como un filtrado), un único sondeo puede admitir un rendimiento de 5 MBps y 5 invocaciones de Lambda simultáneas. El uso del modo aprovisionado conlleva costos adicionales. Para obtener estimaciones de precios, consulte Precios de AWS Lambda
En el modo aprovisionado, el rango de valores aceptados para el número mínimo de sondeos de eventos (MinimumPollers
) oscila entre 1 y 200, inclusive. El rango de valores aceptados para el número máximo de sondeos de eventos (MaximumPollers
) está comprendido entre 1 y 2000, inclusive. MaximumPollers
debe ser mayor o igual que MinimumPollers
. Además, para mantener el procesamiento ordenado dentro de las particiones, Lambda limita el número de MaximumPollers
al número de particiones del tema.
Para obtener más información sobre cómo elegir los valores de sondeos de eventos mínimos y máximos adecuados, consulte Prácticas recomendadas y consideraciones al usar el modo aprovisionado.
Puede configurar el modo aprovisionado para la asignación de orígenes de eventos de Amazon MSK mediante la consola o la API de Lambda.
Cómo configurar el modo aprovisionado para una asignación de orígenes de eventos de Amazon MSK existente (consola)
-
Abra la página de Funciones
en la consola de Lambda. -
Elija la función para la asignación de orígenes de eventos de Amazon MSK para la que desee configurar el modo aprovisionado.
-
Elija Configuración y, a continuación, seleccione Desencadenadores.
-
Elija la asignación de orígenes de eventos de Amazon MSK para la que desee configurar el modo aprovisionado y, a continuación, seleccione Editar.
-
En Configuración de la asignación de orígenes de eventos, elija Configurar el modo aprovisionado.
-
En Número mínimo de sondeos de eventos, introduzca un valor entre 1 y 200. Si no especifica un valor, Lambda asigna el valor predeterminado de 1.
-
En Número máximo de sondeos de eventos, introduzca un valor entre 1 y 2000. Este valor debe ser mayor o igual que su valor para Número mínimo de sondeos de eventos. Si no especifica un valor, Lambda asigna el valor predeterminado de 200.
-
-
Seleccione Save (Guardar).
Puede configurar el modo aprovisionado por programación mediante el objeto ProvisionedPollerConfig de su EventSourceMappingConfiguration. Por ejemplo, el siguiente comando de la CLI UpdateEventSourceMapping configura un valor MinimumPollers
de 5 y un valor MaximumPollers
de 100.
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'
Tras configurar el modo aprovisionado, puede observar el uso de los sondeos de eventos para su carga de trabajo supervisando la métrica ProvisionedPollers
. Para obtener más información, consulte Métricas de asignación de orígenes de eventos.
Para deshabilitar el modo aprovisionado y volver al modo predeterminado (bajo demanda), puede usar el siguiente comando de la CLI UpdateEventSourceMapping:
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{}'
Prácticas recomendadas y consideraciones al usar el modo aprovisionado
La configuración óptima de los sondeos de eventos mínimos y máximos para la asignación de orígenes de eventos depende de los requisitos de rendimiento de la aplicación. Recomendamos que lea los sondeos de eventos mínimos predeterminados para establecer una línea de base del perfil de rendimiento. Ajuste la configuración en función de los patrones de procesamiento de mensajes observados y del perfil de rendimiento deseado.
En el caso de cargas de trabajo con un tráfico intenso y necesidades de rendimiento estrictas, aumente el número mínimo de sondeos de eventos para administrar los picos repentinos de mensajes. Para determinar los sondeos de eventos mínimos necesarios, tenga en cuenta los mensajes de su carga de trabajo por segundo y el tamaño medio de la carga útil y utilice la capacidad de rendimiento de un único sondeo de eventos (hasta 5 MBps) como referencia.
Para mantener el procesamiento ordenado dentro de una partición, Lambda limita el número máximo de sondeos de eventos al número de particiones del tema. Además, el número máximo de sondeos de eventos a los que se puede escalar la asignación de orígenes de eventos depende de la configuración de simultaneidad de la función.
Al activar el modo aprovisionado, actualiza la configuración de la red para eliminar los puntos de conexión de VPC AWS PrivateLink y los permisos asociados.