Selecione suas preferências de cookies

Usamos cookies essenciais e ferramentas semelhantes que são necessárias para fornecer nosso site e serviços. Usamos cookies de desempenho para coletar estatísticas anônimas, para que possamos entender como os clientes usam nosso site e fazer as devidas melhorias. Cookies essenciais não podem ser desativados, mas você pode clicar em “Personalizar” ou “Recusar” para recusar cookies de desempenho.

Se você concordar, a AWS e terceiros aprovados também usarão cookies para fornecer recursos úteis do site, lembrar suas preferências e exibir conteúdo relevante, incluindo publicidade relevante. Para aceitar ou recusar todos os cookies não essenciais, clique em “Aceitar” ou “Recusar”. Para fazer escolhas mais detalhadas, clique em “Personalizar”.

Capturar lotes descartados para uma origem de eventos do Amazon MSK

Modo de foco
Capturar lotes descartados para uma origem de eventos do Amazon MSK - AWS Lambda

Para reter registros de invocações de mapeamento da origem do evento com falha, adicione um destino ao mapeamento da origem de eventos da função. Cada registro enviado ao destino é um documento JSON que contém metadados sobre a invocação que falhou. Para destinos do Amazon S3, o Lambda também envia todo o registro da invocação junto com os metadados. É possível configurar qualquer tópico do Amazon SNS, fila do Amazon SQS ou bucket do S3 como destino.

Com destinos do Amazon S3, você pode usar o recurso Notificações de eventos do Amazon S3 para receber notificações quando objetos forem carregados no bucket do S3 de destino. Você também pode configurar as notificações de eventos do S3 para invocar outra função do Lambda para realizar o processamento automatizado em lotes com falha.

Sua função de execução deve ter permissões para o destino:

Você deve implantar um endpoint da VPC para o serviço de destino em caso de falha dentro da VPC do cluster do Amazon MSK.

Além disso, se você configurou uma chave do KMS no seu destino, o Lambda precisará das seguintes permissões, dependendo do tipo de destino:

  • Se você habilitou a criptografia com sua própria chave do KMS para um destino do S3, kms:GenerateDataKey é necessário. Se a chave do KMS e o destino do bucket do S3 estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:GenerateDataKey.

  • Se você habilitou a criptografia com sua própria chave do KMS para um destino do SQS, kms:Decrypt e kms:GenerateDataKey são necessários. Se a chave do KMS e o destino da fila do SQS estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey e kms:ReEncrypt.

  • Se você habilitou a criptografia com sua própria chave do KMS para um destino do SNS, kms:Decrypt e kms:GenerateDataKey são necessários. Se a chave do KMS e o destino do tópico do SNS estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey e kms:ReEncrypt.

Configurar destinos em caso de falha para um mapeamento da origem do evento do Amazon MSK

Para configurar um destino em caso de falha usando o console, siga estas etapas:

  1. Abra a página Funções do console do Lambda.

  2. Escolha uma função.

  3. Em Function overview (Visão geral da função), escolha Add destination (Adicionar destino).

  4. Em Origem, escolha Invocação do mapeamento da origem do evento.

  5. Em Mapeamento da origem do evento, escolha uma origem de eventos configurada para essa função.

  6. Em Condição, selecione Em caso de falha. Para invocações de mapeamento da origem de eventos, essa é a única condição aceita.

  7. Em Tipo de destino, escolha o tipo de destino para o qual o Lambda envia registros de invocação.

  8. Em Destination (Destino), escolha um recurso.

  9. Escolha Salvar.

Você também pode configurar um destino em caso de falha usando a AWS CLI. Por exemplo, o seguinte comando create-event-source-mapping adiciona um mapeamento de origem de eventos com um destino SQS em caso de falha a MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

O seguinte comando update-event-source-mapping adiciona um destino S3 em caso de falha à origem de eventos associada à entrada uuid:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Para remover um destino, forneça uma string vazia como argumento para o parâmetro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Práticas recomendadas de segurança para destinos do Amazon S3

Excluir um bucket do S3 configurado como destino sem remover o destino da configuração da sua função pode criar um risco de segurança. Se outro usuário souber o nome do seu bucket de destino, ele poderá recriar o bucket na Conta da AWS dele. Registros de invocações com falha serão enviados para o bucket do usuário, potencialmente expondo dados da sua função.

Atenção

Para garantir que os registros de invocação da sua função não possam ser enviados para um bucket do S3 em outra Conta da AWS, adicione uma condição ao perfil de execução da função que limite as permissões s3:PutObject aos buckets na sua conta.

O exemplo a seguir mostra uma política do IAM que limita as permissões s3:PutObject da função aos bucket da conta. Essa política também dá ao Lambda a permissão s3:ListBucket necessária para usar um bucket do S3 como destino.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

Para adicionar uma política de permissões ao perfil de execução da função usando o AWS Management Console ou a AWS CLI, consulte as instruções nos seguintes procedimentos:

Console
Para adicionar uma política de permissões ao perfil de execução de uma função (console)
  1. Abra a página Funções do console do Lambda.

  2. Selecione a função do Lambda cujo perfil de execução você queira modificar.

  3. Na guia Configuração, escolha Permissões.

  4. Na guia Perfil de execução, selecione o Nome do perfil da função para abrir a página do console do IAM do perfil.

  5. Adicione uma política de permissões ao perfil da seguinte maneira:

    1. No painel Políticas de permissões, escolha Adicionar permissões e Criar política em linha.

    2. No Editor de políticas, selecione JSON.

    3. Cole a política que você deseja adicionar no editor (substituindo o JSON existente) e escolha Próximo.

    4. No campo Detalhes da política, insira o Nome da política.

    5. Escolha Criar política.

AWS CLI
Para adicionar uma política de permissões ao perfil de execução de uma função (CLI)
  1. Crie um documento de política de JSON com as permissões necessárias e salve-o em um diretório local.

  2. Use o comando da CLI put-role-policy do IAM para adicionar permissões ao perfil de execução da função. Execute o comando a seguir no diretório em que você salvou seu documento de política de JSON e substitua o nome do perfil, o nome da política e o documento da política pelos seus próprios valores.

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json
Para adicionar uma política de permissões ao perfil de execução de uma função (console)
  1. Abra a página Funções do console do Lambda.

  2. Selecione a função do Lambda cujo perfil de execução você queira modificar.

  3. Na guia Configuração, escolha Permissões.

  4. Na guia Perfil de execução, selecione o Nome do perfil da função para abrir a página do console do IAM do perfil.

  5. Adicione uma política de permissões ao perfil da seguinte maneira:

    1. No painel Políticas de permissões, escolha Adicionar permissões e Criar política em linha.

    2. No Editor de políticas, selecione JSON.

    3. Cole a política que você deseja adicionar no editor (substituindo o JSON existente) e escolha Próximo.

    4. No campo Detalhes da política, insira o Nome da política.

    5. Escolha Criar política.

Exemplo de registro de invocação do SNS e do SQS

O exemplo a seguir mostra o que é enviado pelo Lambda para o destino de tópico do SNS ou de fila do SQS em caso de falha na invocação da origem de eventos do Kafka. Cada uma das chaves abaixo de recordsInfo contém o tópico e a partição do Kafka separados por um hífen. Por exemplo, para a chave "Topic-0", Topic é o tópico do Kafka, e 0 é a partição. Para cada tópico e partição, você pode usar os dados de desvios e do carimbo de data/hora para encontrar os registros de invocação originais.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Exemplo de registro de invocação de destino S3

Para destinos do S3, o Lambda envia todo o registro da invocação junto com os metadados para o destino. O exemplo a seguir mostra o que é enviado pelo Lambda para o destino de bucket do S3 em caso de falha na invocação da origem de eventos do Kafka. Além de todos os campos do exemplo anterior para destinos do SQS e do SNS, o campo payload contém o registro de invocação original como uma string JSON com escape.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
dica

Recomendamos habilitar o versionamento do S3 no bucket de destino.

PrivacidadeTermos do sitePreferências de cookies
© 2025, Amazon Web Services, Inc. ou suas afiliadas. Todos os direitos reservados.