

# Como usar o Lambda para processar registros do Amazon Kinesis Data Streams
<a name="with-kinesis"></a>

Você pode usar uma função do Lambda para processar registros em um [fluxo de dados do Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/introduction.html). É possível mapear uma função do Lambda em um consumidor de throughput compartilhado (iterador padrão) do Kinesis Data Stream ou em um consumidor de throughput dedicado com [distribuição avançada](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html). Para iteradores padrão, o Lambda sonda cada fragmento no stream do Kinesis em busca de registros usando o protocolo HTTP. O mapeamento da origem do evento compartilha a throughput de leitura com outros consumidores do fragmento.

 Para obter detalhes sobre transmissões de dados do Kinesis, consulte[Ler dados do Amazon Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html).

**nota**  
O Kinesis cobra por cada fragmento e, para distribuição avançada, dados lidos da transmissão. Para obter detalhes de preço, consulte [Preço do Amazon Kinesis](https://aws.amazon.com/kinesis/data-streams/pricing).

## Fluxos de sondagem e agrupamento em lotes
<a name="kinesis-polling-and-batching"></a>

O Lambda lê registros do fluxo de dados e invoca sua função [de maneira síncrona](invocation-sync.md) com um evento que contém registros de transmissão. O Lambda lê registros em lotes e invoca sua função para processar registros do lote. Cada lote contém registros de um único fragmento/fluxo de dados.

Sua função do Lambda é uma aplicação de consumidor para seu fluxo de dados. Ele processa um lote de registros por vez de cada estilhaço. É possível mapear uma função Lambda para um consumidor de throughput compartilhado (iterador padrão) ou para um consumidor de throughput dedicado com distribuição avançada.
+ **Iterador padrão**: o Lambda sonda cada fragmento em seu fluxo do Kinesis para identificar registros a uma taxa básica de uma vez por segundo. Quando houver mais registros disponíveis, o Lambda mantém lotes de processamento até que a função alcance o fluxo. O mapeamento da origem do evento compartilha a throughput de leitura com outros consumidores do fragmento.
+ **Distribuição avançada:** para minimizar a latência e maximizar o throughput da leitura, crie um consumidor de fluxo de dados com [distribuição avançada](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html). Os consumidores da distribuição avançada obtêm uma conexão dedicada para cada fragmento que não afeta outros aplicativos que fazem leitura do stream. Os consumidores de fluxos usam HTTP/2 para reduzir a latência, enviando os registros para o Lambda por meio de uma conexão de longa duração e compactando cabeçalhos de solicitação. Você pode criar um consumidor de fluxo com a API [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) do Kinesis.

```
aws kinesis register-stream-consumer \
--consumer-name con1 \
--stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
```

A seguinte saída deverá ser mostrada:

```
{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}
```

Para aumentar a velocidade com que sua função processa registros, [adicione fragmentos ao fluxo de dados](https://repost.aws/knowledge-center/kinesis-data-streams-open-shards). O Lambda processa registros em cada fragmento sequencialmente. Ele deixará de processar registros adicionais em um estilhaço se sua função retornar um erro. Com mais estilhaços, há mais lotes sendo processados de uma só vez, o que diminui o impacto de erros na simultaneidade.

Se a função não conseguir se expandir para lidar com o número total de lotes simultâneos, [solicite um aumento de cota](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html) ou [reserve simultaneidade](configuration-concurrency.md) para a função.

Por padrão, o Lambda invoca a função assim que os registros estão disponíveis. Se o lote que o Lambda lê da fonte de eventos tiver apenas um registro, o Lambda enviará apenas um registro à função. Para evitar a invocação da função com poucos registros, instrua a fonte de eventos para armazenar os registros em buffer por até cinco minutos, configurando uma *janela de lotes*. Antes de invocar a função, o Lambda continua a ler registros da fonte de eventos até coletar um lote inteiro, até que a janela de lote expire ou até que o lote atinja o limite de carga útil de 6 MB. Para obter mais informações, consulte [Comportamento de lotes](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

**Atenção**  
Os mapeamentos da origem do evento do Lambda processam cada evento ao menos uma vez, podendo haver o processamento duplicado de registros. Para evitar possíveis problemas relacionados a eventos duplicados, é altamente recomendável tornar o código da função idempotente. Para saber mais, consulte [Como tornar minha função do Lambda idempotente](https://repost.aws/knowledge-center/lambda-function-idempotent) no Centro de Conhecimentos da AWS.

O Lambda não espera a conclusão de nenhuma [extensão](lambda-extensions.md)configurada para enviar o próximo lote para processamento. Em outras palavras, suas extensões podem continuar sendo executadas enquanto o Lambda processa o próximo lote de registros. Isso pode causar problemas de controle de utilização se você violar quaisquer configurações ou limites de [simultaneidade](lambda-concurrency.md) de sua conta. Para detectar se esse é um problema em potencial, monitore suas funções e verifique se você está vendo [métricas de simultaneidade](monitoring-concurrency.md#general-concurrency-metrics) mais altas do que o esperado para o seu mapeamento da origem do evento. Devido ao curto intervalo entre as invocações, o Lambda pode relatar brevemente um uso de simultaneidade maior do que o número de fragmentos. Isso pode ser verdadeiro até mesmo para funções do Lambda sem extensões.

Configure a opção [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) para processar um fragmento de um fluxo de dados do Kinesis com mais de uma invocação do Lambda simultaneamente. Você pode especificar o número de lotes simultâneos que o Lambda pesquisa de um fragmento por meio de um fator de paralelização de 1 (padrão) a 10. Por exemplo, quando você define `ParallelizationFactor` como 2, pode ter até 200 invocações simultâneas do Lambda para processar 100 fragmentos de dados do Kinesis (embora, na prática, você possa ver valores diferentes para a métrica `ConcurrentExecutions`). Isso ajuda a aumentar o throughput de processamento quando o volume de dados é volátil e o valor de `IteratorAge` é alto. Quando você aumenta o número de lotes simultâneos por fragmento, o Lambda ainda garante o processamento em ordem no nível de chave de partição.

Você também pode usar `ParallelizationFactor` com a agregação do Kinesis. O comportamento do mapeamento da origem do evento depende de você estar ou não usando [fan-out avançado](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html):
+ **Sem fan-out avançado**: todos os eventos dentro de um evento agregado devem ter a mesma chave de partição. A chave de partição também deve ser igual à do evento agregado. Se os eventos dentro do evento agregado tiverem chaves de partição diferentes, o Lambda não poderá garantir o processamento dos eventos na ordem, por chave de partição.
+ **Com fan-out avançado**: primeiro o Lambda decodifica o evento agregado em eventos individuais. O evento agregado pode ter uma chave de partição diferente da chave dos eventos que ele contém. Porém, os eventos que não correspondem à chave de partição são [descartados e perdidos](https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md). O Lambda não processa esses eventos e não os envia para um destino de falha configurado.

## Evento de exemplo
<a name="services-kinesis-event-example"></a>

**Example**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
```

# Processar registros do Amazon Kinesis Data Streams com o Lambda
<a name="services-kinesis-create"></a>

Para processar registros do Amazon Kinesis Data Streams com o Lambda, crie um mapeamento da origem do evento do Lambda. Você pode mapear uma função do Lambda para um iterador padrão ou para um consumidor de fan-out aprimorado. Para obter mais informações, consulte [Fluxos de sondagem e agrupamento em lotes](with-kinesis.md#kinesis-polling-and-batching).

## Criar um mapeamento da origem do evento do Kinesis
<a name="services-kinesis-eventsourcemapping"></a>

Para invocar sua função do Lambda com registros do fluxo de dados, crie um [mapeamento da origem do evento](invocation-eventsourcemapping.md). É possível criar vários mapeamentos de origem de eventos para processar os mesmos dados com várias funções do Lambda ou processar itens de vários fluxos de dados com uma única função. Ao processar itens de vários fluxos, cada lote conterá registros somente de um único fragmento ou transmissão.

Você pode configurar mapeamentos da origem do evento para processar registros de um fluxo em outra Conta da AWS. Para saber mais, consulte [Como criar mapeamentos da origem do evento entre contas](#services-kinesis-eventsourcemapping-cross-account).

Antes de criar um mapeamento da origem do evento, você precisará dar permissão à função do Lambda para ler a partir de um fluxo de dados do Kinesis. O Lambda precisa das permissões a seguir para gerenciar recursos relacionados ao seu fluxo de dados do Kinesis:
+ [kinesis:DescribeStream](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStream.html)
+ [kinesis:DescribeStreamSummary](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStreamSummary.html)
+ [kinesis:GetRecords](https://docs.aws.amazon.com/lambda/latest/api/API_GetRecords.html)
+ [kinesis:GetShardIterator](https://docs.aws.amazon.com/lambda/latest/api/API_GetShardIterator.html)
+ [kinesis:ListShards](https://docs.aws.amazon.com/lambda/latest/api/API_ListShards.html)
+ [kinesis:SubscribeToShard](https://docs.aws.amazon.com/lambda/latest/api/API_SubscribeToShard.html)

A política gerenciada da AWS [AWSLambdaKinesisExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaKinesisExecutionRole.html) inclui essas permissões. Adicione essa política gerenciada à sua função, conforme descrito no procedimento a seguir.

**nota**  
Você não precisa da permissão `kinesis:ListStreams` para criar e gerenciar mapeamentos da origem do evento para o Kinesis. No entanto, se você criar um mapeamento da origem do evento no console e não tiver essa permissão, não poderá selecionar um stream do Kinesis em uma lista suspensa e o console exibirá um erro. Para criar o mapeamento da origem do evento, você precisará inserir manualmente o nome do recurso da Amazon (ARN) do seu stream.
O Lambda faz as chamadas de API `kinesis:GetRecords` e `kinesis:GetShardIterator` ao tentar novamente invocações com falha.

------
#### [ Console de gerenciamento da AWS ]

**Para adicionar permissões do Kinesis à sua função**

1. Abra a [página Funções](https://console.aws.amazon.com/lambda/home#/functions) do console do Lambda e selecione a função.

1. Na guia **Configuração**, escolha **Permissões**.

1. No painel **Perfil de execução**, em **Nome do perfil**, escolha o link para o perfil de execução da sua função. Esse link abre a página para esse perfil no console do IAM.

1. No painel **Políticas de permissões**, escolha **Adicionar permissões** e, em seguida, selecione **Anexar políticas**.

1. No campo de pesquisa, digite **AWSLambdaKinesisExecutionRole**.

1. Marque a caixa de seleção próxima à política e escolha **Adicionar permissão**.

------
#### [ AWS CLI ]

**Para adicionar permissões do Kinesis à sua função**
+ Execute o comando da CLI a seguir para adicionar a política `AWSLambdaKinesisExecutionRole` ao perfil de execução da sua função:

  ```
  aws iam attach-role-policy \
  --role-name MyFunctionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
  ```

------
#### [ AWS SAM ]

**Para adicionar permissões do Kinesis à sua função**
+ Na definição da sua função, adicione a `Policies` propriedade, conforme mostrado no seguinte exemplo:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
  ```

------

Depois de configurar as permissões necessárias, crie o mapeamento de origem de eventos.

------
#### [ Console de gerenciamento da AWS ]

**Para criar o mapeamento da origem do evento do Kinesis**

1. Abra a [página Funções](https://console.aws.amazon.com/lambda/home#/functions) do console do Lambda e selecione a função.

1. No painel **Visão geral da função**, escolha **Adicionar gatilho**.

1. Em **Configuração do acionador**, para a origem, selecione **Kinesis**.

1. Selecione o fluxo do Kinesis para o qual você deseja criar o mapeamento da origem do evento e, opcionalmente, um consumidor do seu fluxo.

1. (Opcional) edite o **Tamanho do lote**, a **Posição inicial** e a **Janela do lote** para o mapeamento da origem do evento.

1. Escolha **Adicionar**.

Ao criar seu mapeamento da origem do evento no console, seu perfil do IAM deve ter as permissões [kinesis:ListStreams](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreams.html) e [kinesis:ListStreamConsumers](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreamConsumers.html).

------
#### [ AWS CLI ]

**Para criar o mapeamento da origem do evento do Kinesis**
+ Execute o comando a seguir da CLI para criar um mapeamento da origem do evento do Kinesis. Escolha seu próprio tamanho de lote e posição inicial de acordo com seu caso de uso.

  ```
  aws lambda create-event-source-mapping \
  --function-name MyFunction \
  --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
  --starting-position LATEST \
  --batch-size 100
  ```

Para especificar uma janela de agrupamento em lotes, adicione a opção `--maximum-batching-window-in-seconds`. Para obter mais informações sobre como este e outros parâmetros, consulte [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) na *Referência de comandos da AWS CLI*.

------
#### [ AWS SAM ]

**Para criar o mapeamento da origem do evento do Kinesis**
+ Na definição da sua função, adicione a `KinesisEvent` propriedade, conforme mostrado no seguinte exemplo:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
        Events:
          KinesisEvent:
            Type: Kinesis
            Properties:
              Stream: !GetAtt MyKinesisStream.Arn
              StartingPosition: LATEST
              BatchSize: 100
  
    MyKinesisStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 1
  ```

Para saber mais sobre como criar um mapeamento de origem de eventos para o Kinesis Data Streams no AWS SAM, consulte [Kinesis](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-kinesis.html) no *Guia do desenvolvedor do AWS Serverless Application Model*.

------

## Posição inicial de sondagem e fluxo
<a name="services-kinesis-stream-start-pos"></a>

Esteja ciente de que a sondagem do fluxo durante a criação e as atualizações do mapeamento da origem do evento é, finalmente, consistente.
+ Durante a criação do mapeamento da origem do evento, pode levar alguns minutos para a sondagem de eventos do fluxo iniciar.
+ Durante as atualizações do mapeamento da origem do evento, pode levar alguns minutos para interromper e reiniciar a sondagem de eventos do fluxo.

Esse comportamento significa que, se você especificar `LATEST` como posição inicial do fluxo, o mapeamento da origem do evento poderá perder eventos durante a criação ou as atualizações. Para garantir que nenhum evento seja perdido, especifique a posição inicial do fluxo como `TRIM_HORIZON` ou `AT_TIMESTAMP`.

## Como criar mapeamentos da origem do evento entre contas
<a name="services-kinesis-eventsourcemapping-cross-account"></a>

O Amazon Kinesis Data Streams permite [políticas baseadas em recursos](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html). Por isso, você pode processar dados ingeridos em um fluxo em uma Conta da AWS com uma função do Lambda em outra conta.

Para criar um mapeamento da origem do evento para sua função do Lambda usando um fluxo do Kinesis em outra Conta da AWS, você deve configurar o fluxo usando uma política baseada em recursos para dar permissão à função do Lambda para ler itens. Para saber como configurar seu fluxo para permitir o acesso entre contas, consulte [Compartilhamento de acesso com funções do AWS Lambda entre contas](https://docs.aws.amazon.com/streams/latest/dev/resource-based-policy-examples.html#Resource-based-policy-examples-lambda) no *Guia do desenvolvedor do Amazon Kinesis Streams*.

Depois de configurar seu fluxo com uma política baseada em recursos que conceda à sua função do Lambda as permissões necessárias, crie o mapeamento da origem do evento usando qualquer um dos métodos descritos na seção anterior.

Se você optar por criar seu mapeamento da origem do evento usando o console Lambda, cole o ARN do seu fluxo diretamente no campo de entrada. Se você quiser especificar um consumidor para seu fluxo, colar o ARN do consumidor preencherá automaticamente o campo do fluxo.

# Configurar a resposta em lote parcial com o Kinesis Data Streams e o Lambda
<a name="services-kinesis-batchfailurereporting"></a>

Ao consumir e processar dados de transmissão de uma fonte de eventos, o Lambda definirá checkpoints por padrão no número mais elevado na sequência de um lote somente quando o lote for um sucesso total. O Lambda trata todos os outros resultados como uma falha completa e tenta processar novamente o lote até o limite de novas tentativas. Para permitir sucessos parciais durante o processamento de lotes de um stream, ative `ReportBatchItemFailures`. Permitir sucessos parciais pode ajudar a reduzir o número de novas tentativas em um registro, embora não impeça totalmente a possibilidade de novas tentativas em um registro bem-sucedido.

Para ativar `ReportBatchItemFailures`, inclua o valor de enum **ReportBatchItemFailures** na lista [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes). Essa lista indica quais tipos de resposta estão habilitados para sua função. Você pode configurar essa lista ao [criar](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) ou [atualizar](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) um mapeamento de origem de eventos.

**nota**  
Mesmo quando seu código de função retorna respostas parciais de falha em lote, essas respostas não serão processadas pelo Lambda, a menos que o atributo `ReportBatchItemFailures` esteja explicitamente ativado para o mapeamento da origem do evento.

## Sintaxe do relatório
<a name="streams-batchfailurereporting-syntax"></a>

Ao configurar relatórios sobre falhas de itens de lote, a classe `StreamsEventResponse` é retornada com uma lista de falhas de itens de lote. É possível usar um objeto `StreamsEventResponse` para retornar o número sequencial do primeiro registro com falha no lote. Você também pode criar sua própria classe personalizada usando a sintaxe de resposta correta. A seguinte estrutura JSON mostra a sintaxe de resposta necessária:

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**nota**  
Se a matriz `batchItemFailures` contém vários itens, o Lambda usa o registro com o menor número de sequência como ponto de verificação. Em seguida, o Lambda repete todos os registros a partir desse ponto de verificação.

## Condições de sucesso e falha
<a name="streams-batchfailurereporting-conditions"></a>

O Lambda trata um lote como um sucesso completo se você retornar qualquer um destes:
+ Uma lista de `batchItemFailure` vazia
+ Uma lista de `batchItemFailure` nula
+ Uma vazia `EventResponse`
+ Uma nula `EventResponse`

O Lambda trata um lote como uma falha absoluta se você retornar qualquer um dos seguintes:
+ Uma string vazia `itemIdentifier`
+ Uma nula `itemIdentifier`
+ Um `itemIdentifier` com um nome de chave inválido

O Lambda faz novas tentativas após falhas com base na sua estratégia de repetição.

## Dividir um lote
<a name="streams-batchfailurereporting-bisect"></a>

Se a invocação falhar e `BisectBatchOnFunctionError` estiver ativado, o lote será dividido independentemente da configuração de `ReportBatchItemFailures`.

Quando uma resposta de sucesso parcial do lote é recebida e tanto `BisectBatchOnFunctionError` quanto `ReportBatchItemFailures` estão ativados, o lote é dividido no número de sequência retornado e o Lambda tenta novamente apenas os registros restantes.

Para simplificar a implementação da lógica de respostas parciais em lote, considere usar o [utilitário de processador em lote](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) da Powertools para AWS Lambda, que lida de maneira automática com essas complexidades para você.

Veja alguns exemplos de código de função que retornam a lista de IDs de mensagens com falha no lote:

------
#### [ .NET ]

**SDK para .NET**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Relatar falhas de itens em lote do Kinesis com o Lambda usando o .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegration;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return new StreamsEventResponse();
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                return new StreamsEventResponse
                {
                    BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
                    {
                        new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
                    }
                };
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
        return new StreamsEventResponse();
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}

public class StreamsEventResponse
{
    [JsonPropertyName("batchItemFailures")]
    public IList<BatchItemFailure> BatchItemFailures { get; set; }
    public class BatchItemFailure
    {
        [JsonPropertyName("itemIdentifier")]
        public string ItemIdentifier { get; set; }
    }
}
```

------
#### [ Go ]

**SDK para Go V2**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Relatar falhas de itens em lote do Kinesis com o Lambda usando Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) {
	batchItemFailures := []map[string]interface{}{}

	for _, record := range kinesisEvent.Records {
		curRecordSequenceNumber := ""

		// Process your record
		if /* Your record processing condition here */ {
			curRecordSequenceNumber = record.Kinesis.SequenceNumber
		}

		// Add a condition to check if the record processing failed
		if curRecordSequenceNumber != "" {
			batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
		}
	}

	kinesisBatchResponse := map[string]interface{}{
		"batchItemFailures": batchItemFailures,
	}
	return kinesisBatchResponse, nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK para Java 2.x**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Relatar falhas de itens em lote do Kinesis com o Lambda usando Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
            try {
                //Process your record
                KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
                curRecordSequenceNumber = kinesisRecord.getSequenceNumber();

            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse(batchItemFailures);   
    }
}
```

------
#### [ JavaScript ]

**SDK para JavaScript (v3)**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Relatar falhas de itens em lote do Kinesis com o Lambda usando Javascript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Relatar falhas de itens em lote do Kinesis com o Lambda usando TypeScript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
  KinesisStreamBatchResponse,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<KinesisStreamBatchResponse> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  logger.info(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK para PHP**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Relatar falhas de itens em lote do Kinesis com o Lambda usando PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $kinesisEvent = new KinesisEvent($event);
        $this->logger->info("Processing records");
        $records = $kinesisEvent->getRecords();

        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK para Python (Boto3).**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Relatar falhas de itens em lote do Kinesis com o Lambda usando Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK para Ruby**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Relatar falhas de item em lote do Kinesis com o Lambda usando Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  batch_item_failures = []

  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue StandardError => err
      puts "An error occurred #{err}"
      # Since we are working with streams, we can return the failed item immediately.
      # Lambda will immediately begin to retry processing from this failed item onwards.
      return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] }
    end
  end

  puts "Successfully processed #{event['Records'].length} records."
  { batchItemFailures: batch_item_failures }
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('utf-8')
  # Placeholder for actual async work
  sleep(1)
  data
end
```

------
#### [ Rust ]

**SDK para Rust**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Relate falhas de itens em lote do Kinesis com o Lambda usando Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
    event::kinesis::KinesisEvent,
    kinesis::KinesisEventRecord,
    streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
    let mut response = KinesisEventResponse {
        batch_item_failures: vec![],
    };

    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in &event.payload.records {
        tracing::info!(
            "EventId: {}",
            record.event_id.as_deref().unwrap_or_default()
        );

        let record_processing_result = process_record(record);

        if record_processing_result.is_err() {
            response.batch_item_failures.push(KinesisBatchItemFailure {
                item_identifier: record.kinesis.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(response)
}

fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
    let record_data = std::str::from_utf8(record.kinesis.data.as_slice());

    if let Some(err) = record_data.err() {
        tracing::error!("Error: {}", err);
        return Err(Error::from(err));
    }

    let record_data = record_data.unwrap_or_default();

    // do something interesting with the data
    tracing::info!("Data: {}", record_data);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Usar o Powertools para o processador em lote AWS Lambda
<a name="services-kinesis-batchfailurereporting-powertools"></a>

O utilitário de processador em lote do Powertools para AWS Lambda lida de maneira automática com a lógica de respostas parciais em lote, reduzindo a complexidade da implementação de relatórios de falhas em lote. Veja a seguir alguns exemplos usando o processador em lote:

**Python**  
Para ver exemplos completos e instruções de configuração, consulte a [documentação do processador em lote](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/).
Processamento de registros de stream do Kinesis Data Streams com o processador em lote AWS Lambda.  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import KinesisEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
Para ver exemplos completos e instruções de configuração, consulte a [documentação do processador em lote](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/).
Processamento de registros de stream do Kinesis Data Streams com o processador em lote AWS Lambda.  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { KinesisEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: KinesisEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
Para ver exemplos completos e instruções de configuração, consulte a [documentação do processador em lote](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/).
Processamento de registros de stream do Kinesis Data Streams com o processador em lote AWS Lambda.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class KinesisStreamBatchHandler implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;

    public KinesisStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withKinesisBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
        return handler.processBatch(kinesisEvent, context);
    }

    private void processMessage(KinesisEvent.KinesisEventRecord kinesisEventRecord, Context context) {
        // Process the stream record
    }
}
```

**.NET**  
Para ver exemplos completos e instruções de configuração, consulte a [documentação do processador em lote](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/).
Processamento de registros de stream do Kinesis Data Streams com o processador em lote AWS Lambda.  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class OrderEvent
{
    public string? OrderId { get; set; }
    public string? CustomerId { get; set; }
    public decimal Amount { get; set; }
    public DateTime OrderDate { get; set; }
}

internal class TypedKinesisRecordHandler : ITypedRecordHandler<OrderEvent> 
{
    public async Task<RecordHandlerResult> HandleAsync(OrderEvent orderEvent, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(orderEvent.OrderId)) 
        {
            throw new ArgumentException("Order ID is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
    {
        return TypedKinesisStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Reter registros de lotes descartados para uma origem de eventos do Kinesis Data Streams no Lambda
<a name="kinesis-on-failure-destination"></a>

O tratamento de erros para mapeamentos de origem de eventos do Kinesis depende se o erro ocorre antes de a função ser invocada ou durante a invocação da função:
+ **Antes da invocação:** se um mapeamento de origem de eventos do Lambda não conseguir invocar a função devido a limitações ou a outros problemas, ele tentará novamente até que os registros expirem ou excedam a idade máxima configurada no mapeamento de origem de eventos ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)).
+ **Durante a invocação:** se a função for invocada, mas retornar um erro, o Lambda tentará novamente até que os registros expirem, excedam a idade máxima ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)) ou atinjam a cota de repetição configurada ([MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)). Para erros de função, também é possível pode configurar [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError), que divide um lote com falha em dois lotes em lotes, isolando registros com problema e evitando exceder tempos limites. A divisão de lotes não consome a cota de repetição.

Se as medidas de tratamento de erros falharem, o Lambda descartará os registros e continuará processando lotes provenientes da transmissão. Com as configurações padrão, isso significa que um registro ruim pode bloquear o processamento no fragmento afetado por até uma semana. Para evitar isso, configure o mapeamento de fontes de eventos da sua função com um número razoável de tentativas e uma idade máxima de registro que se adapte ao seu caso de uso.

## Configurar destinos para invocações com falha
<a name="kinesis-on-failure-destination-console"></a>

Para reter registros de invocações de mapeamento da origem do evento com falha, adicione um destino ao mapeamento da origem de eventos da função. Cada registro enviado ao destino é um documento JSON que contém metadados sobre a invocação que falhou. Para destinos do Amazon S3, o Lambda também envia todo o registro da invocação junto com os metadados. É possível configurar qualquer tópico do Amazon SNS, fila do Amazon SQS, bucket do Amazon S3 ou Kafka como destino.

Com destinos do Amazon S3, você pode usar o recurso [Notificações de eventos do Amazon S3](https://docs.aws.amazon.com/) para receber notificações quando objetos forem carregados no bucket do S3 de destino. Também é possível configurar as notificações de eventos do S3 para invocar outra função do Lambda para realizar o processamento automatizado em lotes com falha.

Sua função de execução deve ter permissões para o destino:
+ **Para um destino do SQS:** [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **Para um destino do SNS:** [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **Para um destino do S3:** [s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) e [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Para um destino do Kafka**: [kafka-cluster:WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

É possível configurar um tópico do Kafka como um destino do Kafka em caso de falha para os seus mapeamentos da origem do evento do Kafka. Quando o Lambda não consegue processar registros após exaurir as novas tentativas ou quando os registros excedem a idade máxima, o Lambda envia os registros com falha para o tópico especificado do Kafka para processamento posterior. Consulte [Uso de um tópico do Kafka como destino em caso de falha](kafka-on-failure-destination.md).

Se você habilitou a criptografia com sua própria chave do KMS para um destino do S3, o perfil de execução da função também deve ter permissão para chamar [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html). Se a chave do KMS e o destino do bucket do S3 estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:GenerateDataKey.

Para configurar um destino em caso de falha usando o console, siga estas etapas:

1. Abra a [página Funções](https://console.aws.amazon.com/lambda/home#/functions) do console do Lambda.

1. Escolha uma função.

1. Em **Function overview (Visão geral da função)**, escolha **Add destination (Adicionar destino)**.

1. Em **Origem**, escolha **Invocação do mapeamento da origem do evento**.

1. Em **Mapeamento da origem do evento**, escolha uma origem de eventos configurada para essa função.

1. Em **Condição**, selecione **Em caso de falha**. Para invocações de mapeamento da origem de eventos, essa é a única condição aceita.

1. Em **Tipo de destino**, escolha o tipo de destino para o qual o Lambda envia registros de invocação.

1. Em **Destination (Destino)**, escolha um recurso.

1. Escolha **Salvar**.

Também é possível configurar um destino em caso de falha usando a AWS Command Line Interface (AWS CLI). Por exemplo, o seguinte comando [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) adiciona um mapeamento de origem de eventos com um destino SQS em caso de falha a `MyFunction`:

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

O comando [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) a seguir atualiza um mapeamento de origem de eventos para enviar registros de chamada com falha para um destino SNS após duas novas tentativas ou se os registros tiverem mais de uma hora.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

As configurações atualizadas são aplicadas de forma assíncrona e não são refletidas na saída até que o processo seja concluído. Use o comando [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) para visualizar o status atual.

Para remover um destino, forneça uma string vazia como argumento para o parâmetro `destination-config`:

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Práticas recomendadas de segurança para destinos do Amazon S3
<a name="kinesis-s3-destination-security"></a>

Excluir um bucket do S3 configurado como destino sem remover o destino da configuração da sua função pode criar um risco de segurança. Se outro usuário souber o nome do seu bucket de destino, ele poderá recriar o bucket na Conta da AWS dele. Registros de invocações com falha serão enviados para o bucket do usuário, potencialmente expondo dados da sua função.

**Atenção**  
Para garantir que os registros de invocação da sua função não possam ser enviados para um bucket do S3 em outra Conta da AWS, adicione uma condição ao perfil de execução da função que limite as permissões `s3:PutObject` aos buckets na sua conta. 

O exemplo a seguir mostra uma política do IAM que limita as permissões `s3:PutObject` da função aos bucket da conta. Essa política também dá ao Lambda a permissão `s3:ListBucket` necessária para usar um bucket do S3 como destino.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

Para adicionar uma política de permissões ao perfil de execução da função usando o Console de gerenciamento da AWS ou a AWS CLI, consulte as instruções nos seguintes procedimentos:

------
#### [ Console ]

**Para adicionar uma política de permissões ao perfil de execução de uma função (console)**

1. Abra a [página Funções](https://console.aws.amazon.com/lambda/home#/functions) do console do Lambda.

1. Selecione a função do Lambda cujo perfil de execução você queira modificar.

1. Na guia **Configuração**, escolha **Permissões**.

1. Na guia **Perfil de execução**, selecione o **Nome do perfil** da função para abrir a página do console do IAM do perfil.

1. Adicione uma política de permissões ao perfil da seguinte maneira:

   1. No painel **Políticas de permissões**, escolha **Adicionar permissões** e **Criar política em linha**.

   1. No **Editor de políticas**, selecione **JSON**.

   1. Cole a política que você deseja adicionar no editor (substituindo o JSON existente) e escolha **Próximo**.

   1. No campo **Detalhes da política**, insira o **Nome da política**.

   1. Escolha **Criar política**.

------
#### [ AWS CLI ]

**Para adicionar uma política de permissões ao perfil de execução de uma função (CLI)**

1. Crie um documento de política de JSON com as permissões necessárias e salve-o em um diretório local.

1. Use o comando da CLI `put-role-policy` do IAM para adicionar permissões ao perfil de execução da função. Execute o comando a seguir no diretório em que você salvou seu documento de política de JSON e substitua o nome do perfil, o nome da política e o documento da política pelos seus próprios valores.

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Exemplo de registro de invocação do Amazon SNS e do Amazon SQS
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

O exemplo a seguir mostra o que é enviado pelo Lambda para uma fila do SQS ou tópico do SNS em caso de falha na invocação da origem de eventos do Kinesis. Como o Lambda envia somente os metadados para esses tipos de destino, use os campos `streamArn`, `shardId`, `startSequenceNumber` e `endSequenceNumber` para obter o registro original completo. Todos os campos mostrados na propriedade `KinesisBatchInfo` estarão sempre presentes.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    }
}
```

É possível usar essas informações para recuperar os registros afetados da transmissão para solução de problemas. Os registros reais não estão incluídos, portanto, você deve processar esses registros e recuperá-los da transmissão antes que eles expirem e sejam perdidos.

### Exemplo de registro de invocação do Amazon S3
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

O exemplo a seguir mostra o que é enviado pelo Lambda para um bucket do Amazon S3 em caso de falha na invocação da origem de eventos do Kinesis. Além de todos os campos do exemplo anterior para destinos do SQS e do SNS, o campo `payload` contém o registro de invocação original como uma string JSON com escape.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

O objeto do S3 que contém o registro de invocação usa a seguinte convenção de nomenclatura:

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Implementando o processamento com estado do Kinesis Data Streams no Lambda
<a name="services-kinesis-windows"></a>

As funções do Lambda podem executar aplicações de processamento contínuo de transmissões. Um stream representa dados não vinculados que fluem continuamente por meio de sua aplicação. Para analisar as informações dessa entrada de atualização contínua, você pode vincular os registros incluídos usando uma janela definida em termos de tempo.

As janelas de tumbling são janelas de tempo distintas que abrem e fecham em intervalos regulares. Por padrão, as invocações do Lambda são sem estado. Não é possível usá-las para processar dados ao longo de várias invocações contínuas sem um banco de dados externo. No entanto, com as janelas de tumbling, você pode manter seu estado em todas as invocações. Esse estado contém o resultado agregado das mensagens previamente processadas para a janela atual. Seu estado pode ter no máximo 1 MB por fragmento. Se exceder esse tamanho, o Lambda encerra a janela antes.

Cada registro de um fluxo pertence a uma janela específica. O Lambda processará cada registro pelo menos uma vez, mas não garantirá que cada registro seja processado apenas uma vez. Em casos raros, como tratamento de erros, alguns registros poderão ser processados mais de uma vez. Os registros são sempre processados em ordem na primeira vez. Se os registros forem processados mais de uma vez, poderão ser processados fora de ordem.

## Agregação e processamento
<a name="streams-tumbling-processing"></a>

Sua função gerenciada pelo usuário é chamada tanto para agregação quanto para processamento dos resultados finais dessa agregação. O Lambda agrega todos os registros recebidos na janela. Você pode receber esses registros em vários lotes, cada um como uma invocação separada. Cada invocação recebe um estado. Assim, ao usar janelas de tumbling, sua resposta de função do Lambda deve conter uma propriedade de `state`. Se a resposta não contiver uma propriedade de `state`, o Lambda considerará esta uma invocação com falha. Para satisfazer essa condição, a função pode retornar um objeto do `TimeWindowEventResponse`, que tem a seguinte forma JSON:

**Example `TimeWindowEventResponse`Valores de**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**nota**  
Para funções Java, recomendamos o uso de um `Map<String, String>` para representar o estado.

No final da janela, a sinalização `isFinalInvokeForWindow` é definida como `true` para indicar que esse é o estado final e que está pronto para processamento. Após o processamento, a janela é concluída e sua invocação final é concluída e, em seguida, o estado é descartado.

No final da janela, o Lambda usa o processamento final para ações sobre os resultados da agregação. Seu processamento final é invocado de forma síncrona. Após a invocação bem-sucedida, sua função define os pontos de verificação no número da sequência e o processamento de streams continua. Se a invocação não for bem-sucedida, sua função do Lambda suspenderá o processamento adicional até uma chamada bem-sucedida.

**Example KinesisTimeWindowEvent**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1607497475.000
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
        }
    ],
    "window": {
        "start": "2020-12-09T07:04:00Z",
        "end": "2020-12-09T07:06:00Z"
    },
    "state": {
        "1": 282,
        "2": 715
    },
    "shardId": "shardId-000000000006",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Configuração
<a name="streams-tumbling-config"></a>

Você pode configurar janelas em cascata ao criar ou atualizar um mapeamento de fonte de eventos. Para configurar uma janela em cascata, especifique a janela em segundos ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)). O comando de exemplo da AWS Command Line Interface (AWS CLI) a seguir cria um mapeamento de fonte de eventos em streaming com uma janela em cascata de 120 segundos. A função do Lambda definida para agregação e processamento é chamada de `tumbling-window-example-function`.

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

O Lambda determina os limites da janela em cascata com base no horário em que os registros foram inseridos no stream. Todos os registros têm um carimbo de data/hora aproximado disponível que o Lambda usa para determinar os limites.

As agregações de janelas em cascata não são compatíveis com refragmentação. Quando um fragmento termina, o Lambda considera a janela como fechada e os fragmentos filhos iniciam suas próprias janelas em um novo estado. Quando nenhum novo registro está sendo adicionado à janela atual, o Lambda aguarda até 2 minutos antes de assumir que a janela terminou. Isso ajuda a garantir que a função leia todos os registros na janela atual, mesmo que os registros sejam adicionados de forma intermitente.

As janelas em cascata são totalmente compatíveis com as políticas `maxRetryAttempts` e `maxRecordAge`.

**Example Handler.py: agregação e processamento**  
A função do Python a seguir demonstra como agregar e, em seguida, processar seu estado final:  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Parâmetros do Lambda para mapeamento de origem de eventos do Amazon Kinesis Data Streams.
<a name="services-kinesis-parameters"></a>

Todos os mapeamentos de origem de evento Lambda compartilham as mesmas operações de API [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) e [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html). No entanto, apenas alguns dos parâmetros se aplicam ao Kinesis.


| Parâmetro | Obrigatório | Padrão | Observações | 
| --- | --- | --- | --- | 
|  [BatchSize](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BatchSize)  |  N  |  100  |  Máximo: 10.000.  | 
|  [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BisectBatchOnFunctionError)  |  N  |  false  |  nenhuma | 
|  [DestinationConfig](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-DestinationConfig)  |  N  | N/D |  Uma fila do Amazon SQS ou um destino de tópico do Amazon SNS para registros descartados. Para ter mais informações, consulte [Configurar destinos para invocações com falha](kinesis-on-failure-destination.md#kinesis-on-failure-destination-console).  | 
|  [Ativado](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-Enabled)  |  N  |  verdadeiro  |  nenhuma | 
|  [EventSourceArn](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-EventSourceArn)  |  S  | N/D |  O ARN do fluxo de dados ou um consumidor de fluxo  | 
|  [FunctionName](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionName)  |  S  | N/D |  nenhuma | 
|  [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes)  |  N  |  N/D |  Para permitir que sua função reporte falhas específicas em um lote, inclua o valor `ReportBatchItemFailures` em `FunctionResponseTypes`. Para ter mais informações, consulte [Configurar a resposta em lote parcial com o Kinesis Data Streams e o Lambda](services-kinesis-batchfailurereporting.md).  | 
|  [MaximumBatchingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumBatchingWindowInSeconds)  |  N  |  0  |  nenhuma | 
|  [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)  |  N  |  -1  |  -1 significa infinito: o Lambda não descarta registros (as [configurações de retenção de dados do Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/kinesis-extended-retention.html) ainda se aplicam) Mínimo: -1 Máximo: 604.800  | 
|  [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)  |  N  |  -1  |  -1 significa infinito: registros com falha são repetidos até que o registro expire Mínimo: -1 Máximo: 10.000.  | 
|  [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)  |  N  |  1  |  Máximo: 10  | 
|  [StartingPosition](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPosition)  |  S  |  N/D |  AT\$1TIMESTAMP, TRIM\$1HORIZON, ou LATEST  | 
|  [StartingPositionTimestamp](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPositionTimestamp)  |  N  |  N/D |  Válido somente se StartingPosition estiver definido como AT\$1TIMESTAMP. O tempo a partir do qual iniciar a leitura, em segundos no horário do Unix  | 
|  [TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)  |  N  |  N/D |  Mínimo: 0 Máximo: 900  | 

# Usar a filtragem de eventos com uma origem de eventos do Kinesis
<a name="with-kinesis-filtering"></a>

É possível usar filtragem de eventos para controlar quais registros de um stream ou fila que o Lambda enviará para a função. Para obter informações gerais sobre como a filtragem de eventos funciona, consulte [Controlar quais eventos o Lambda envia para a função](invocation-eventfiltering.md).

Esta seção tem como foco a filtragem de eventos para as origens de eventos do Kinesis.

**nota**  
Os mapeamentos das origens dos eventos do Kineses são compatíveis apenas com filtragem na chave `data`.

**Topics**
+ [Conceitos básicos da filtragem de eventos do Kinesis](#filtering-kinesis)
+ [Filtragem de registros agregados do Kinesis](#filtering-kinesis-efo)

## Conceitos básicos da filtragem de eventos do Kinesis
<a name="filtering-kinesis"></a>

Suponha que um produtor esteja inserindo dados formatados em JSON em seu fluxo de dados do Kinesis. Um exemplo de registro seria semelhante ao a seguir, com os dados JSON convertidos em uma string codificada em Base64 no campo `data`.

```
{
    "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=",
        "approximateArrivalTimestamp": 1545084650.987
        },
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
```

Desde que os dados que o produtor coloque no stream sejam JSON válido, é possível usar a filtragem de eventos para filtrar registros usando a chave `data`. Suponha que um produtor esteja inserindo dados em seu stream do Kinesis no formato JASON a seguir.

```
{
    "record": 12345,
    "order": {
        "type": "buy",
        "stock": "ANYCO",
        "quantity": 1000
        }
}
```

Para filtrar somente os registros em que o tipo de pedido é “comprar”, o objeto `FilterCriteria` seria como a seguir.

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"
        }
    ]
}
```

Para maior clareza, aqui está o valor de `Pattern` do filtro expandido em JSON simples. 

```
{
    "data": {
        "order": {
            "type": [ "buy" ]
            }
      }
}
```

É possível adicionar seu filtro usando o console, a AWS CLI ou um modelo do AWS SAM.

------
#### [ Console ]

Para adicionar esse filtro usando o console, siga as instruções em [Anexar critérios de filtro a um mapeamento de fonte de eventos (console)](invocation-eventfiltering.md#filtering-console) e insira a string a seguir em **Critérios do filtro**.

```
{ "data" : { "order" : { "type" : [ "buy" ] } } }
```

------
#### [ AWS CLI ]

Para criar um novo mapeamento da origem do evento com esses critérios de filtro usando a AWS Command Line Interface (AWS CLI), execute o comando a seguir.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

Para adicionar esses critérios de filtro a um mapeamento da origem do evento existente, execute o comando a seguir.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

------
#### [ AWS SAM ]

Para adicionar esse filtro usando o AWS SAM, adicione o trecho a seguir ao modelo YAML da origem do evento.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'
```

------

Para filtrar corretamente eventos de origens do Kinesis, tanto o campo de dados como os critérios de filtro para o campo de dados devem estar em formato JSON válido. Se algum desses campos não estiver em um formato JSON válido, o Lambda descartará a mensagem ou emitirá uma exceção. A tabela a seguir resume o comportamento específico: 


| Formato dos dados recebidos | Formato de filtro padrão para propriedades de dados | Ação resultante | 
| --- | --- | --- | 
|  JSON válido  |  JSON válido  |  Filtros do Lambda com base em seus critérios de filtro.  | 
|  JSON válido  |  Nenhum padrão de filtro para propriedades de dados  |  Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.  | 
|  JSON válido  |  Não JSON  |  O Lambda emite uma exceção no momento da criação ou atualização do mapeamento da fonte de eventos. O padrão de filtro para propriedades de dados deve estar em um formato JSON válido.  | 
|  Não JSON  |  JSON válido  |  O Lambda descarta o registro.  | 
|  Não JSON  |  Nenhum padrão de filtro para propriedades de dados  |  Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.  | 
|  Não JSON  |  Não JSON  |  O Lambda emite uma exceção no momento da criação ou atualização do mapeamento da fonte de eventos. O padrão de filtro para propriedades de dados deve estar em um formato JSON válido.  | 

## Filtragem de registros agregados do Kinesis
<a name="filtering-kinesis-efo"></a>

Com o Kinesis, é possível agregar vários registros em um único registro do Kinesis Data Streams para aumentar seu throughput. O Lambda pode aplicar critérios de filtro a registros agregados somente quando você usar a [distribuição avançada](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html) do Kinesis. Não há suporte para a filtragem de registros agregados com o Kinesis padrão. Ao usar a distribuição avançada, você configura um consumidor de throughput dedicado do Kinesis para atuar como acionador para sua função do Lambda. Em seguida, o Lambda filtra os registros agregados e passa somente os registros que atendam aos seus critérios de filtragem.

Para saber mais sobre a agregação de registros do Kinesis, consulte a seção [Agregação](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation) na página Conceitos principais da Kinesis Producer Library (KPL). Para saber mais sobre como usar o Lambda com a distribuição avançada do Kinesis, consulte [Aumento do desempenho do processamento de streams em tempo real com a distribuição avançada do Amazon Kinesis Data Streams e o AWS Lambda](https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/) no blog de computação da AWS.

# Tutorial: Usar o Lambda com o Kinesis Data Streams
<a name="with-kinesis-example"></a>

Neste tutorial, você criará uma função do Lambda para consumir eventos de um fluxo de dados do Amazon Kinesis. 

1. O aplicativo personalizado grava registros no fluxo.

1. AWS LambdaO sonda a transmissão e, quando detecta novos registros, invoca sua função do Lambda.

1. AWS LambdaO executa a função do Lambda assumindo a função de execução especificada no momento da criação da função do Lambda.

## Pré-requisitos
<a name="with-kinesis-prepare"></a>

### Instalar o AWS Command Line Interface
<a name="install_aws_cli"></a>

Se você ainda não instalou a AWS Command Line Interface, siga as etapas em [Instalar ou atualizar a versão mais recente da AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) para instalá-la.

O tutorial requer um terminal de linha de comando ou um shell para executar os comandos. No Linux e no macOS, use o gerenciador de pacotes e de shell de sua preferência.

**nota**  
No Windows, alguns comandos da CLI do Bash que você costuma usar com o Lambda (como `zip`) não são compatíveis com os terminais integrados do sistema operacional. Para obter uma versão do Ubuntu com o Bash integrada no Windows, [instale o Subsistema do Windows para Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Criar a função de execução
<a name="with-kinesis-example-create-iam-role"></a>

Crie a [função de execução](lambda-intro-execution-role.md) que dá à sua função permissão para acessar recursos do AWS.

**Para criar uma função de execução**

1. Abra a [página Roles](https://console.aws.amazon.com/iam/home#/roles) (Funções) no console do IAM.

1. Selecione **Create role** (Criar função).

1. Crie uma função com as propriedades a seguir.
   + **Trusted entity (Entidade confiável** – **AWS Lambda**.
   + **Permissões**: **AWSLambdaKinesisExecutionRole**.
   + **Role name (Nome da função** – **lambda-kinesis-role**.

A política **AWSLambdaKinesisExecutionRole** tem as permissões necessárias para a função ler itens do Kinesis e gravar logs no CloudWatch Logs.

## Criar a função
<a name="with-kinesis-example-create-function"></a>

Crie uma função do Lambda que processe suas mensagens do Kinesis. O código da função registra o ID do evento e os dados do evento do registro do Kinesis no CloudWatch Logs.

Este tutorial usa o runtime do Node.js 24, mas também fornecemos exemplos de códigos em outras linguagens de runtime. Você pode selecionar a guia na caixa a seguir para ver o código do runtime do seu interesse. O código JavaScript que você usará nesta etapa é o primeiro exemplo mostrado na guia **JavaScript**.

------
#### [ .NET ]

**SDK para .NET**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK para Go V2**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK para Java 2.x**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK para JavaScript (v3)**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando JavaScript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Consumir um evento do Kinesis com o Lambda usando TypeScript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK para PHP**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK para Python (Boto3).**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK para Ruby**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK para Rust**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consuma um evento do Kinesis com o Lambda usando Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**Para criar a função**

1. Crie um diretório para o projeto e depois mude para esse diretório.

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. Copie o código JavaScript de amostra em um novo arquivo denominado `index.js`.

1. Crie um pacote de implantação.

   ```
   zip function.zip index.js
   ```

1. Crie uma função do Lambda com o comando `create-function`.

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Testar a função do Lambda
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

Invoque sua função do Lambda manualmente usando o comando `invoke` da CLI do AWS Lambda e um evento do Kinesis de amostra.

**Para testar a função do Lambda**

1. Copie o JSON a seguir em um arquivo e salve-o como `input.txt`. 

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. Use o comando `invoke` para enviar o evento para a função.

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   A opção **cli-binary-format** será necessária se você estiver usando a AWS CLI versão 2. Para que essa seja a configuração padrão, execute `aws configure set cli-binary-format raw-in-base64-out`. Para obter mais informações, consulte [A AWS CLI comporta opções de linha de comando globais](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) no *Guia do usuário da AWS Command Line Interface versão 2*.

   A resposta é salva no `out.txt`.

## Criar uma transmissão do Kinesis
<a name="with-kinesis-example-configure-event-source-create"></a>

Use o comando `create-stream ` para criar um fluxo.

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

Execute o comando `describe-stream` a seguir para obter o ARN do stream.

```
aws kinesis describe-stream --stream-name lambda-stream
```

A seguinte saída deverá ser mostrada:

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

Você usa o ARN do stream na próxima etapa para associar o stream à sua função do Lambda.

## Adicionar uma fonte de eventos no AWS Lambda
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

Execute o comando AWS CLI a seguir da `add-event-source`.

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

Observe o ID do mapeamento para uso posterior. Você pode obter uma lista de mapeamentos de origem de evento executando o comando `list-event-source-mappings` a seguir.

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

Na resposta, pode verificar que o valor do status é `enabled`. Os mapeamentos de origem de eventos podem ser desabilitados para pausar a pesquisa temporariamente, sem perder nenhum registro.

## Testar a configuração
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

Para testar o mapeamento da origem do evento, adicione registros de eventos ao seu stream do Kinesis. O valor `--data` é uma string que a CLI codifica em base64 antes de enviá-la ao Kinesis. Você pode executar o mesmo comando mais de uma vez para adicionar vários registros ao fluxo.

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

O Lambda usa a função de execução para ler registros da transmissão. Em seguida, ele invoca sua função do Lambda, transmitindo lotes de registros. A função decodifica os dados de cada registro e os registra, enviando a saída para CloudWatch Logs. Visualize os logs no [console do CloudWatch](https://console.aws.amazon.com/cloudwatch).

## Limpe os recursos
<a name="cleanup"></a>

Agora você pode excluir os recursos criados para este tutorial, a menos que queira mantê-los. Excluindo os recursos da AWS que você não está mais usando, você evita cobranças desnecessárias em sua Conta da AWS.

**Para excluir a função de execução**

1. Abra a página [Roles](https://console.aws.amazon.com/iam/home#/roles) (Funções) no console do IAM.

1. Selecione a função de execução que você criou.

1. Escolha **Excluir**.

1. Insira o nome do perfil no campo de entrada de texto e escolha **Delete** (Excluir).

**Como excluir a função do Lambda**

1. Abra a página [Functions](https://console.aws.amazon.com/lambda/home#/functions) (Funções) no console do Lambda.

1. Selecione a função que você criou.

1. Selecione **Ações**, **Excluir**.

1. Digite **confirm** no campo de entrada de texto e escolha **Delete** (Excluir).

**Para excluir a transmissão do Kinesis**

1. Faça login no Console de gerenciamento da AWS e abra o console do Kinesis em [ https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Selecione a transmissão criada.

1. Selecione **Ações**, **Excluir**.

1. Digite **delete** no campo de entrada de texto.

1. Escolha **Excluir**.