Reter registros de lotes descartados para uma origem de eventos do Kinesis Data Streams no Lambda - AWS Lambda

Reter registros de lotes descartados para uma origem de eventos do Kinesis Data Streams no Lambda

O tratamento de erros para mapeamentos de origem de eventos do Kinesis depende se o erro ocorre antes de a função ser invocada ou durante a invocação da função:

  • Antes da invocação: se um mapeamento de origem de eventos do Lambda não conseguir invocar a função devido a limitações ou a outros problemas, ele tentará novamente até que os registros expirem ou excedam a idade máxima configurada no mapeamento de origem de eventos (MaximumRecordAgeInSeconds).

  • Durante a invocação: se a função for invocada, mas retornar um erro, o Lambda tentará novamente até que os registros expirem, excedam a idade máxima (MaximumRecordAgeInSeconds) ou atinjam a cota de repetição configurada (MaximumRetryAttempts). Para erros de função, também é possível pode configurar BisectBatchOnFunctionError, que divide um lote com falha em dois lotes em lotes, isolando registros com problema e evitando exceder tempos limites. A divisão de lotes não consome a cota de repetição.

Se as medidas de tratamento de erros falharem, o Lambda descartará os registros e continuará processando lotes provenientes da transmissão. Com as configurações padrão, isso significa que um registro ruim pode bloquear o processamento no fragmento afetado por até uma semana. Para evitar isso, configure o mapeamento de fontes de eventos da sua função com um número razoável de tentativas e uma idade máxima de registro que se adapte ao seu caso de uso.

Configurar destinos para invocações com falha

Para reter registros de invocações de mapeamento da origem do evento com falha, adicione um destino ao mapeamento da origem de eventos da função. Cada registro enviado ao destino é um documento JSON que contém metadados sobre a invocação que falhou. Para destinos do Amazon S3, o Lambda também envia todo o registro da invocação junto com os metadados. É possível configurar qualquer tópico do Amazon SNS, fila do Amazon SQS ou bucket do S3 como destino.

Com destinos do Amazon S3, você pode usar o recurso Notificações de eventos do Amazon S3 para receber notificações quando objetos forem carregados no bucket do S3 de destino. Você também pode configurar as notificações de eventos do S3 para invocar outra função do Lambda para realizar o processamento automatizado em lotes com falha.

Sua função de execução deve ter permissões para o destino:

Se você habilitou a criptografia com sua própria chave do KMS para um destino do S3, o perfil de execução da função também deve ter permissão para chamar kms:GenerateDataKey. Se a chave do KMS e o destino do bucket do S3 estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:GenerateDataKey.

Para configurar um destino em caso de falha usando o console, siga estas etapas:

  1. Abra a página Funções do console do Lambda.

  2. Escolha uma função.

  3. Em Function overview (Visão geral da função), escolha Add destination (Adicionar destino).

  4. Em Origem, escolha Invocação do mapeamento da origem do evento.

  5. Em Mapeamento da origem do evento, escolha uma origem de eventos configurada para essa função.

  6. Em Condição, selecione Em caso de falha. Para invocações de mapeamento da origem de eventos, essa é a única condição aceita.

  7. Em Tipo de destino, escolha o tipo de destino para o qual o Lambda envia registros de invocação.

  8. Em Destination (Destino), escolha um recurso.

  9. Escolha Salvar.

Também é possível configurar um destino em caso de falha usando a AWS Command Line Interface (AWS CLI). Por exemplo, o seguinte comando create-event-source-mapping adiciona um mapeamento de origem de eventos com um destino SQS em caso de falha a MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

O comando update-event-source-mapping a seguir atualiza um mapeamento de origem de eventos para enviar registros de chamada com falha para um destino SNS após duas novas tentativas ou se os registros tiverem mais de uma hora.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

As configurações atualizadas são aplicadas de forma assíncrona e não são refletidas na saída até que o processo seja concluído. Use o comando get-event-source-mapping para visualizar o status atual.

Para remover um destino, forneça uma string vazia como argumento para o parâmetro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Práticas recomendadas de segurança para destinos do Amazon S3

Excluir um bucket do S3 configurado como destino sem remover o destino da configuração da sua função pode criar um risco de segurança. Se outro usuário souber o nome do seu bucket de destino, ele poderá recriar o bucket na Conta da AWS dele. Registros de invocações com falha serão enviados para o bucket do usuário, potencialmente expondo dados da sua função.

Atenção

Para garantir que os registros de invocação da sua função não possam ser enviados para um bucket do S3 em outra Conta da AWS, adicione uma condição ao perfil de execução da função que limite as permissões s3:PutObject aos buckets na sua conta.

O exemplo a seguir mostra uma política do IAM que limita as permissões s3:PutObject da função aos bucket da conta. Essa política também dá ao Lambda a permissão s3:ListBucket necessária para usar um bucket do S3 como destino.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

Para adicionar uma política de permissões ao perfil de execução da função usando o AWS Management Console ou a AWS CLI, consulte as instruções nos seguintes procedimentos:

Console
Para adicionar uma política de permissões ao perfil de execução de uma função (console)
  1. Abra a página Funções do console do Lambda.

  2. Selecione a função do Lambda cujo perfil de execução você queira modificar.

  3. Na guia Configuração, escolha Permissões.

  4. Na guia Perfil de execução, selecione o Nome do perfil da função para abrir a página do console do IAM do perfil.

  5. Adicione uma política de permissões ao perfil da seguinte maneira:

    1. No painel Políticas de permissões, escolha Adicionar permissões e Criar política em linha.

    2. No Editor de políticas, selecione JSON.

    3. Cole a política que você deseja adicionar no editor (substituindo o JSON existente) e escolha Próximo.

    4. No campo Detalhes da política, insira o Nome da política.

    5. Escolha Criar política.

AWS CLI
Para adicionar uma política de permissões ao perfil de execução de uma função (CLI)
  1. Crie um documento de política de JSON com as permissões necessárias e salve-o em um diretório local.

  2. Use o comando da CLI put-role-policy do IAM para adicionar permissões ao perfil de execução da função. Execute o comando a seguir no diretório em que você salvou seu documento de política de JSON e substitua o nome do perfil, o nome da política e o documento da política pelos seus próprios valores.

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Exemplo de registro de invocação do Amazon SNS e do Amazon SQS

O exemplo a seguir mostra o que é enviado pelo Lambda para uma fila do SQS ou tópico do SNS em caso de falha na invocação da origem de eventos do Kinesis. Como o Lambda envia somente os metadados para esses tipos de destino, use os campos streamArn, shardId, startSequenceNumber e endSequenceNumber para obter o registro original completo. Todos os campos mostrados na propriedade KinesisBatchInfo estarão sempre presentes.

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

Você pode usar essas informações para recuperar os registros afetados da transmissão para solução de problemas. Os registros reais não estão incluídos, portanto, você deve processar esses registros e recuperá-los da transmissão antes que eles expirem e sejam perdidos.

Exemplo de registro de invocação do Amazon S3

O exemplo a seguir mostra o que é enviado pelo Lambda para um bucket do Amazon S3 em caso de falha na invocação da origem de eventos do Kinesis. Além de todos os campos do exemplo anterior para destinos do SQS e do SNS, o campo payload contém o registro de invocação original como uma string JSON com escape.

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" }, "payload": "<Whole Event>" // Only available in S3 }

O objeto do S3 que contém o registro de invocação usa a seguinte convenção de nomenclatura:

aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>