Conserve los registros de lotes descartados para unorigen de eventos de Kinesis Data Streams en Lambda - AWS Lambda

Conserve los registros de lotes descartados para unorigen de eventos de Kinesis Data Streams en Lambda

La gestión de errores en las asignaciones de orígenes de eventos de Kinesis depende de si el error se produce antes de que se invoque la función o durante la invocación de la función:

Si las medidas de administración de errores fallan, Lambda descarta los registros y continúa procesando lotes del flujo. Con la configuración predeterminada, esto significa que un registro incorrecto puede bloquear el procesamiento en la partición afectada durante un máximo de una semana. Para evitar esto, configure el mapeo de fuente de eventos de su función con un número razonable de reintentos y una antigüedad máxima de registro que se ajuste a su caso de uso.

Configuración de destinos para invocaciones fallidas

Para retener los registros de las invocaciones de asignación de orígenes de eventos fallidos, agregue un destino a la asignación de orígenes de eventos de su función. Cada registro enviado al destino es un documento JSON que contiene metadatos sobre la invocación fallida. Para los destinos de Amazon S3, Lambda envía también todo el registro de invocación junto con los metadatos. Puede configurar cualquier tema de Amazon SNS, cola de Amazon SQS o bucket de S3 como destino.

Con los destinos de Amazon S3, puede utilizar la característica de notificaciones de eventos de Amazon S3 para recibir notificaciones cuando se carguen objetos en el bucket de S3 de destino. También puede configurar las notificaciones de eventos de S3 para que invoquen otra función de Lambda para realizar el procesamiento automatizado de los lotes fallidos.

Su rol de ejecución debe tener permisos para el destino:

Si tiene activado el cifrado con su propia clave de KMS para un destino de S3, el rol de ejecución de la función también debe tener permiso para llamar a kms:GenerateDataKey. Si la clave de KMS y el destino del bucket de S3 están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:GenerateDataKey.

Para configurar un destino en caso de error mediante la consola, siga estos pasos:

  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija una función.

  3. En Descripción general de la función, elija Agregar destino.

  4. En Origen, elija Invocación de asignación de orígenes de eventos.

  5. Para la Asignación de orígenes de eventos, elija un origen de eventos que esté configurado para esta función.

  6. En Condición, seleccione En caso de error. Para las invocaciones de asignación de orígenes de eventos, esta es la única condición aceptada.

  7. En Tipo de destino, elija el tipo de destino al que Lambda envía los registros de invocación.

  8. En Destination (Destino), elija un recurso.

  9. Seleccione Save (Guardar).

También puede configurar un destino en caso de error mediante la AWS Command Line Interface (AWS CLI). Por ejemplo, el siguiente comando create-event-source-mapping agrega una asignación de orígenes de eventos con un destino de SQS en caso de error a MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

El siguiente comando update-event-source-mapping actualiza una asignación de orígenes de eventos para enviar registros de invocación fallida a un destino de SNS después de dos intentos de reintento, o si los registros tienen más de una hora de antigüedad.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

La configuración actualizada se aplica de forma asincrónica y no se refleja en la salida hasta que se completa el proceso. Utilice el comando get-event-source-mapping para ver el estado actual.

Para eliminar un destino, introduzca una cadena vacía como argumento del parámetro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Prácticas recomendadas de seguridad para destinos de Amazon S3

Eliminar un bucket de S3 que está configurado como destino sin eliminar el destino de la configuración de la función puede suponer un riesgo de seguridad. Si otro usuario conoce el nombre del bucket de destino, puede volver a crear el bucket en su Cuenta de AWS. Los registros de las invocaciones fallidas se enviarán a su bucket, lo que podría exponer los datos de su función.

aviso

Para asegurarse de que los registros de invocación de su función no se puedan enviar a un bucket de S3 de otra Cuenta de AWS, agregue una condición al rol de ejecución de la función que limite los permisos s3:PutObject a los buckets de su cuenta.

En el siguiente ejemplo, se muestra una política de IAM que limita los permisos s3:PutObject de la función a los buckets de la cuenta. Esta política también otorga a Lambda el permiso s3:ListBucket que necesita para usar un bucket de S3 como destino.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

Para agregar una política de permisos al rol de ejecución de su función mediante la AWS Management Console o AWS CLI, consulte las instrucciones de los siguientes procedimientos:

Console
Cómo agregar una política de permisos al rol de ejecución de la función (consola)
  1. Abra la página de Funciones en la consola de Lambda.

  2. Seleccione la función de Lambda cuyo rol de ejecución desee modificar.

  3. En la pestaña Configuración, elija Permisos.

  4. En la pestaña Rol de ejecución, seleccione el nombre del rol de la función para abrir la página de la consola de IAM del rol.

  5. Agregue una política de permisos al rol de la siguiente manera:

    1. En el panel Política de permisos, elija Agregar permisos y seleccione Crear política insertada.

    2. En el editor de políticas, seleccione JSON.

    3. Pegue la política que desee agregar en el editor (sustituyendo el JSON existente) y, a continuación, seleccione Siguiente.

    4. En Detalles de política, ingrese un Nombre de política.

    5. Seleccione Crear política.

AWS CLI
Cómo agregar una política de permisos al rol de ejecución de la función (CLI)
  1. Cree un documento de política de JSON con los permisos necesarios y guárdelo en un directorio local.

  2. Utilice el comando de la CLI de IAM put-role-policy para agregar permisos al rol de ejecución de la función. Ejecute el siguiente comando desde el directorio en el que guardó el documento de política JSON y sustituya el nombre del rol, el nombre de la política y el documento de política por sus propios valores.

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Ejemplo de registro de invocación de Amazon SNS y Amazon SQS

El siguiente ejemplo muestra lo que Lambda envía a un destino de tema de SNS o cola de SQS cuando se produce un error en la invocación de un origen de eventos de Kafka. Como Lambda envía solo los metadatos para estos tipos de destino, utilice los campos streamArn, shardId, startSequenceNumber y endSequenceNumber para obtener el registro original completo. Todos los campos que se muestran en la propiedad KinesisBatchInfo siempre estarán presentes.

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

Puede utilizar esta información para recuperar los registros afectados del flujo para solucionar problemas. Los registros reales no están incluidos, por lo que debe procesar este registro y recuperarlos del flujo antes de que caduquen y se pierdan.

Ejemplo de registro de invocación de Amazon S3

En el siguiente ejemplo, se muestra lo que Lambda envía a un bucket de Amazon S3 cuando se produce un error en la invocación de un origen de eventos de Kinesis. Además de todos los campos del ejemplo anterior para los destinos de SQS y SNS, el campo payload contiene el registro de invocación original en forma de cadena JSON de escape.

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" }, "payload": "<Whole Event>" // Only available in S3 }

El objeto de S3 que contiene el registro de invocación utiliza la siguiente convención de nomenclatura:

aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>