Capturar lotes descartados para un origen de eventos de Amazon MSK
Para retener los registros de las invocaciones de asignación de orígenes de eventos fallidos, agregue un destino a la asignación de orígenes de eventos de su función. Cada registro enviado al destino es un documento JSON que contiene metadatos sobre la invocación fallida. Para los destinos de Amazon S3, Lambda envía también todo el registro de invocación junto con los metadatos. Puede configurar cualquier tema de Amazon SNS, cola de Amazon SQS o bucket de S3 como destino.
Con los destinos de Amazon S3, puede utilizar la característica de notificaciones de eventos de Amazon S3 para recibir notificaciones cuando se carguen objetos en el bucket de S3 de destino. También puede configurar las notificaciones de eventos de S3 para que invoquen otra función de Lambda para realizar el procesamiento automatizado de los lotes fallidos.
Su rol de ejecución debe tener permisos para el destino:
-
Para destinos de SQS: sqs:SendMessage
-
Para destinos de SNS: sns:Publish
-
Para destinos de bucket de S3: s3:PutObject y s3:ListBucket
Debe implementar un punto de conexión de VPC para el servicio de destino en caso de error en la VPC de su clúster de Amazon MSK.
Además, si configuró una clave de KMS en su destino, Lambda necesita los siguientes permisos según el tipo de destino:
-
Si tiene activado el cifrado con su propia clave de KMS para un destino de S3, necesitará kms:GenerateDataKey. Si la clave de KMS y el destino del bucket de S3 están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:GenerateDataKey.
-
Si tiene activado el cifrado con su propia clave de KMS para un destino de SQS, necesitará kms:Decrypt y kms:GenerateDataKey. Si la clave de KMS y el destino de la cola de SQS están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey y kms:ReEncrypt.
-
Si tiene activado el cifrado con su propia clave de KMS para un destino de SNS, necesitará kms:Decrypt y kms:GenerateDataKey. Si la clave de KMS y el destino del tema de SNS están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey y kms:ReEncrypt.
Configuración de destinos en caso de error para la asignación de orígenes de eventos de Amazon MSK
Para configurar un destino en caso de error mediante la consola, siga estos pasos:
Abra la página de Funciones
en la consola de Lambda. -
Elija una función.
-
En Descripción general de la función, elija Agregar destino.
-
En Origen, elija Invocación de asignación de orígenes de eventos.
-
Para la Asignación de orígenes de eventos, elija un origen de eventos que esté configurado para esta función.
-
En Condición, seleccione En caso de error. Para las invocaciones de asignación de orígenes de eventos, esta es la única condición aceptada.
-
En Tipo de destino, elija el tipo de destino al que Lambda envía los registros de invocación.
-
En Destination (Destino), elija un recurso.
-
Seleccione Save (Guardar).
También puede configurar un destino en caso de error mediante la AWS CLI. Por ejemplo, el siguiente comando create-event-source-mappingMyFunction
:
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
El siguiente comando UpdateEventSourceMappinguuid
:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
Para eliminar un destino, introduzca una cadena vacía como argumento del parámetro destination-config
:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
Prácticas recomendadas de seguridad para destinos de Amazon S3
Eliminar un bucket de S3 que está configurado como destino sin eliminar el destino de la configuración de la función puede suponer un riesgo de seguridad. Si otro usuario conoce el nombre del bucket de destino, puede volver a crear el bucket en su Cuenta de AWS. Los registros de las invocaciones fallidas se enviarán a su bucket, lo que podría exponer los datos de su función.
aviso
Para asegurarse de que los registros de invocación de su función no se puedan enviar a un bucket de S3 de otra Cuenta de AWS, agregue una condición al rol de ejecución de la función que limite los permisos s3:PutObject
a los buckets de su cuenta.
En el siguiente ejemplo, se muestra una política de IAM que limita los permisos s3:PutObject
de la función a los buckets de la cuenta. Esta política también otorga a Lambda el permiso s3:ListBucket
que necesita para usar un bucket de S3 como destino.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount":
"111122223333"
} } } ] }
Para agregar una política de permisos al rol de ejecución de su función mediante la AWS Management Console o AWS CLI, consulte las instrucciones de los siguientes procedimientos:
Ejemplo de registro de invocación SNS y SQS
El siguiente ejemplo muestra lo que Lambda envía a un destino de tema de SNS o cola de SQS cuando se produce un error en la invocación de un origen de eventos de Kafka. Cada una de las claves en recordsInfo
contiene el tema y la partición de Kafka, separados por un guion. Por ejemplo, para la clave "Topic-0"
, Topic
es el tema de Kafka, y 0
es la partición. Para cada tema y partición, puede usar los desplazamientos y los datos de las marcas de tiempo para buscar los registros de invocación originales.
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }
Ejemplo de registro de invocación de un destino S3
Para los destinos de S3, Lambda envía todo el registro de invocación junto con los metadatos al destino. El siguiente ejemplo muestra lo que Lambda envía a un bucket de S3 de destino cuando se produce un error en la invocación de un origen de evento de Kafka. Además de todos los campos del ejemplo anterior para los destinos de SQS y SNS, el campo payload
contiene el registro de invocación original en forma de cadena JSON de escape.
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
sugerencia
También se recomienda habilitar el control de versiones de S3 en el bucket de destino.