Capturar lotes descartados para uma origem de eventos do Apache Kafka autogerenciado
Para reter registros de invocações de mapeamento da origem do evento com falha, adicione um destino ao mapeamento da origem de eventos da função. Cada registro enviado ao destino é um documento JSON com metadados sobre a invocação que falhou. É possível configurar qualquer tópico do Amazon SNS, fila do Amazon SQS ou bucket do S3 como destino. Sua função de execução deve ter permissões para o destino:
-
Para destinos SQS: sqs:SendMessage
-
Para destinos SNS: sns:Publish
-
Para destinos de bucket do S3: s3:PutObject e s3:ListBuckets
Você deve implantar um endpoint da VPC para o serviço de destino em caso de falha dentro da VPC do cluster do Apache Kafka.
Além disso, se você configurou uma chave do KMS no seu destino, o Lambda precisará das seguintes permissões, dependendo do tipo de destino:
-
Se você habilitou a criptografia com sua própria chave do KMS para um destino do S3, kms:GenerateDataKey é necessário. Se a chave do KMS e o destino do bucket do S3 estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:GenerateDataKey.
-
Se você habilitou a criptografia com sua própria chave do KMS para um destino do SQS, kms:Decrypt e kms:GenerateDataKey são necessários. Se a chave do KMS e o destino da fila do SQS estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey e kms:ReEncrypt.
-
Se você habilitou a criptografia com sua própria chave do KMS para um destino do SNS, kms:Decrypt e kms:GenerateDataKey são necessários. Se a chave do KMS e o destino do tópico do SNS estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey e kms:ReEncrypt.
Definir destinos em caso de falha para um mapeamento da origem do evento do Apache Kafka autogerenciado
Para configurar um destino em caso de falha usando o console, siga estas etapas:
Abra a página Funções
do console do Lambda. -
Escolha uma função.
-
Em Function overview (Visão geral da função), escolha Add destination (Adicionar destino).
-
Em Origem, escolha Invocação do mapeamento da origem do evento.
-
Em Mapeamento da origem do evento, escolha uma origem de eventos configurada para essa função.
-
Em Condição, selecione Em caso de falha. Para invocações de mapeamento da origem de eventos, essa é a única condição aceita.
-
Em Tipo de destino, escolha o tipo de destino para o qual o Lambda envia registros de invocação.
-
Em Destination (Destino), escolha um recurso.
-
Escolha Salvar.
Você também pode configurar um destino em caso de falha usando a AWS CLI. Por exemplo, o seguinte comando create-event-source-mappingMyFunction
:
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
O seguinte comando update-event-source-mappinguuid
:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
Para remover um destino, forneça uma string vazia como argumento para o parâmetro destination-config
:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
Exemplo de registro de invocação do SNS e do SQS
O exemplo a seguir mostra o que é enviado pelo Lambda para o destino de tópico do SNS ou de fila do SQS em caso de falha na invocação da origem de eventos do Kafka. Cada uma das chaves abaixo de recordsInfo
contém o tópico e a partição do Kafka separados por um hífen. Por exemplo, para a chave "Topic-0"
, Topic
é o tópico do Kafka, e 0
é a partição. Para cada tópico e partição, você pode usar os dados de desvios e do carimbo de data/hora para encontrar os registros de invocação originais.
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }
Exemplo de registro de invocação de destino S3
Para destinos do S3, o Lambda envia todo o registro da invocação junto com os metadados para o destino. O exemplo a seguir mostra o que é enviado pelo Lambda para o destino de bucket do S3 em caso de falha na invocação da origem de eventos do Kafka. Além de todos os campos do exemplo anterior para destinos do SQS e do SNS, o campo payload
contém o registro de invocação original como uma string JSON com escape.
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
dica
Recomendamos habilitar o versionamento do S3 no bucket de destino.