Capturar lotes descartados para un origen de eventos de Amazon MSK - AWS Lambda

Capturar lotes descartados para un origen de eventos de Amazon MSK

Para retener los registros de las invocaciones de asignación de orígenes de eventos fallidos, agregue un destino a la asignación de orígenes de eventos de su función. Cada registro enviado al destino es un documento JSON que contiene metadatos sobre la invocación fallida. Para los destinos de Amazon S3, Lambda envía también todo el registro de invocación junto con los metadatos. Puede configurar cualquier tema de Amazon SNS, cola de Amazon SQS o bucket de S3 como destino.

Con los destinos de Amazon S3, puede utilizar la característica de notificaciones de eventos de Amazon S3 para recibir notificaciones cuando se carguen objetos en el bucket de S3 de destino. También puede configurar las notificaciones de eventos de S3 para que invoquen otra función de Lambda para realizar el procesamiento automatizado de los lotes fallidos.

Su rol de ejecución debe tener permisos para el destino:

Debe implementar un punto de conexión de VPC para el servicio de destino en caso de error en la VPC de su clúster de Amazon MSK.

Además, si configuró una clave de KMS en su destino, Lambda necesita los siguientes permisos según el tipo de destino:

  • Si tiene activado el cifrado con su propia clave de KMS para un destino de S3, necesitará kms:GenerateDataKey. Si la clave de KMS y el destino del bucket de S3 están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:GenerateDataKey.

  • Si tiene activado el cifrado con su propia clave de KMS para un destino de SQS, necesitará kms:Decrypt y kms:GenerateDataKey. Si la clave de KMS y el destino de la cola de SQS están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey y kms:ReEncrypt.

  • Si tiene activado el cifrado con su propia clave de KMS para un destino de SNS, necesitará kms:Decrypt y kms:GenerateDataKey. Si la clave de KMS y el destino del tema de SNS están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey y kms:ReEncrypt.

Configuración de destinos en caso de error para la asignación de orígenes de eventos de Amazon MSK

Para configurar un destino en caso de error mediante la consola, siga estos pasos:

  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija una función.

  3. En Descripción general de la función, elija Agregar destino.

  4. En Origen, elija Invocación de asignación de orígenes de eventos.

  5. Para la Asignación de orígenes de eventos, elija un origen de eventos que esté configurado para esta función.

  6. En Condición, seleccione En caso de error. Para las invocaciones de asignación de orígenes de eventos, esta es la única condición aceptada.

  7. En Tipo de destino, elija el tipo de destino al que Lambda envía los registros de invocación.

  8. En Destination (Destino), elija un recurso.

  9. Seleccione Save (Guardar).

También puede configurar un destino en caso de error mediante la AWS CLI. Por ejemplo, el siguiente comando create-event-source-mapping agrega una asignación de orígenes de eventos con un destino de SQS en caso de error a MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

El siguiente comando UpdateEventSourceMapping agrega un destino S3 en caso de error al origen de eventos asociado a la entrada uuid:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Para eliminar un destino, introduzca una cadena vacía como argumento del parámetro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Prácticas recomendadas de seguridad para destinos de Amazon S3

Eliminar un bucket de S3 que está configurado como destino sin eliminar el destino de la configuración de la función puede suponer un riesgo de seguridad. Si otro usuario conoce el nombre del bucket de destino, puede volver a crear el bucket en su Cuenta de AWS. Los registros de las invocaciones fallidas se enviarán a su bucket, lo que podría exponer los datos de su función.

aviso

Para asegurarse de que los registros de invocación de su función no se puedan enviar a un bucket de S3 de otra Cuenta de AWS, agregue una condición al rol de ejecución de la función que limite los permisos s3:PutObject a los buckets de su cuenta.

En el siguiente ejemplo, se muestra una política de IAM que limita los permisos s3:PutObject de la función a los buckets de la cuenta. Esta política también otorga a Lambda el permiso s3:ListBucket que necesita para usar un bucket de S3 como destino.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

Para agregar una política de permisos al rol de ejecución de su función mediante la AWS Management Console o AWS CLI, consulte las instrucciones de los siguientes procedimientos:

Console
Cómo agregar una política de permisos al rol de ejecución de la función (consola)
  1. Abra la página de Funciones en la consola de Lambda.

  2. Seleccione la función de Lambda cuyo rol de ejecución desee modificar.

  3. En la pestaña Configuración, elija Permisos.

  4. En la pestaña Rol de ejecución, seleccione el nombre del rol de la función para abrir la página de la consola de IAM del rol.

  5. Agregue una política de permisos al rol de la siguiente manera:

    1. En el panel Política de permisos, elija Agregar permisos y seleccione Crear política insertada.

    2. En el editor de políticas, seleccione JSON.

    3. Pegue la política que desee agregar en el editor (sustituyendo el JSON existente) y, a continuación, seleccione Siguiente.

    4. En Detalles de política, ingrese un Nombre de política.

    5. Seleccione Crear política.

AWS CLI
Cómo agregar una política de permisos al rol de ejecución de la función (CLI)
  1. Cree un documento de política de JSON con los permisos necesarios y guárdelo en un directorio local.

  2. Utilice el comando de la CLI de IAM put-role-policy para agregar permisos al rol de ejecución de la función. Ejecute el siguiente comando desde el directorio en el que guardó el documento de política JSON y sustituya el nombre del rol, el nombre de la política y el documento de política por sus propios valores.

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Ejemplo de registro de invocación SNS y SQS

El siguiente ejemplo muestra lo que Lambda envía a un destino de tema de SNS o cola de SQS cuando se produce un error en la invocación de un origen de eventos de Kafka. Cada una de las claves en recordsInfo contiene el tema y la partición de Kafka, separados por un guion. Por ejemplo, para la clave "Topic-0", Topic es el tema de Kafka, y 0 es la partición. Para cada tema y partición, puede usar los desplazamientos y los datos de las marcas de tiempo para buscar los registros de invocación originales.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Ejemplo de registro de invocación de un destino S3

Para los destinos de S3, Lambda envía todo el registro de invocación junto con los metadatos al destino. El siguiente ejemplo muestra lo que Lambda envía a un bucket de S3 de destino cuando se produce un error en la invocación de un origen de evento de Kafka. Además de todos los campos del ejemplo anterior para los destinos de SQS y SNS, el campo payload contiene el registro de invocación original en forma de cadena JSON de escape.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
sugerencia

También se recomienda habilitar el control de versiones de S3 en el bucket de destino.