

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Scopri come Lambda elabora i record di Flusso di dati Amazon Kinesis.
<a name="with-kinesis"></a>

È possibile usare una funzione Lambda per elaborare i record in un [flusso di dati Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/introduction.html). È possibile mappare una funzione Lambda a un consumer a throughput condiviso (iteratore standard) del flusso di dati Kinesis o a un consumer di throughput dedicato con [fan-out avanzato](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html). Per gli iteratori standard, Lambda interroga ogni shard nel flusso Kinesis alla ricerca di record utilizzando il protocollo HTTP. La mappatura dell'origine eventi condivide il throughput di lettura con altri utenti dello shard.

 Per informazioni dettagliate sui flussi di dati Kinesis, consulta [Lettura dei dati da Flusso di dati Amazon Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html).

**Nota**  
Kinesis addebita dei costi per ogni shard, per il fan-out avanzato e per i dati letti dal flusso. Per i dettagli sui prezzi, consulta [Prezzi di Amazon Kinesis](https://aws.amazon.com/kinesis/data-streams/pricing).

## Flussi di polling e batching
<a name="kinesis-polling-and-batching"></a>

Lambda legge i record dal flusso di dati e richiama la funzione [in modo sincrono](invocation-sync.md) con un evento che contiene record di flusso. Lambda legge i record in batch e richiama la funzione per elaborare i record dal batch. Ogni batch contiene registri da una singolo shard/flusso dei dati.

La funzione Lambda è un'applicazione consumer per il flusso di dati. Elabora un batch di record alla volta da ciascuno shard. È possibile mappare una funzione Lambda a un consumer a throughput condiviso (iteratore standard) o a un consumer di throughput dedicato con fan-out avanzato.
+ **Iteratori standard:** Lambda esegue il polling per ogni shard nel flusso Kinesis per i record a una velocità di base di una volta al secondo. Quando sono disponibili più record, Lambda continua l'elaborazione dei batch fino a quando la funzione raggiunge il flusso. La mappatura dell'origine eventi condivide il throughput di lettura con altri utenti dello shard.
+ **Fan-out avanzato:** per ridurre al minimo la latenza e massimizzare il throughput di lettura, è necessario creare un consumer del flusso di dati con [fan-out avanzato](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html). I consumatori del fan-out avanzato ottengono una connessione dedicata a ciascuno shard che non ha conseguenze su altre applicazioni che leggono dal flusso. I consumatori del flusso utilizzano HTTP/2 per ridurre la latenza spingendo record da Lambda a long-lived su una connessione di lunga durata e comprimendo le intestazioni della richiesta. È possibile creare un consumatore di flusso con l'API [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) Kinesis.

```
aws kinesis register-stream-consumer \
--consumer-name con1 \
--stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
```

Verrà visualizzato l'output seguente:

```
{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}
```

Per aumentare la velocità con cui la funzione elabora i record, [aggiungere gli shard al flusso di dati](https://repost.aws/knowledge-center/kinesis-data-streams-open-shards). Lambda elabora i record in ogni shard in ordine. Interrompe l'elaborazione di record aggiuntivi in uno shard se la funzione restituisce un errore. Con più partizioni, vengono elaborati più batch contemporaneamente e l'impatto di errori in simultanea viene ridotto.

Se la funzione non è in grado di aumentare le dimensioni fino a gestire il numero totale di batch simultanei, [richiedere un aumento della quota](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html) o [riservare la simultaneità](configuration-concurrency.md) per la funzione.

Per impostazione predefinita, Lambda richiama la funzione non appena i record sono disponibili. Se il batch che Lambda legge dall'origine eventi contiene un solo record, Lambda invia solo un record alla funzione. Per evitare di richiamare la funzione con pochi record è possibile, configurando un *periodo di batch*, chiedere all'origine eventi di memorizzare nel buffer i registri per un massimo di 5 minuti. Prima di richiamare la funzione, Lambda continua a leggere i registri dall'origine eventi fino a quando non ha raccolto un batch completo, fino alla scadenza del periodo di batch o fino a quando il batch non ha raggiunto il limite del payload di 6 MB. Per ulteriori informazioni, consulta [Comportamento di batching](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

**avvertimento**  
Gli strumenti di mappatura dell'origine degli eventi elaborano ogni evento almeno una volta e può verificarsi un'elaborazione duplicata dei record. Per evitare potenziali problemi legati agli eventi duplicati, ti consigliamo vivamente di rendere idempotente il codice della funzione. Per ulteriori informazioni, consulta [Come posso rendere idempotente la mia funzione Lambda](https://repost.aws/knowledge-center/lambda-function-idempotent) nel Knowledge Center AWS.

Lambda non attende il completamento di alcuna [estensione](lambda-extensions.md) prima di inviare il batch successivo per l'elaborazione. In altre parole, le estensioni possono continuare a funzionare mentre Lambda elabora il successivo batch di record. Ciò può causare problemi di limitazione in caso di violazione delle impostazioni o dei limiti di [simultaneità](lambda-concurrency.md). Per rilevare se si tratta di un potenziale problema, monitora le tue funzioni e verifica se i [parametri di simultaneità](monitoring-concurrency.md#general-concurrency-metrics) per lo strumento di mappatura dell'origine degli eventi sono superiori al previsto. A causa degli intervalli ridotti tra le invocazioni, Lambda potrebbe segnalare brevemente un utilizzo della simultaneità maggiore rispetto al numero di partizioni. Ciò può essere vero anche per le funzioni Lambda senza estensioni.

Configura l'impostazione [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) per elaborare uno shard di un flusso di dati Kinesis con più invocazioni Lambda contemporaneamente. È possibile specificare il numero di batch simultanei di cui Lambda esegue il polling da uno shard da un fattore di parallelizzazione compreso tra da 1 (predefinito) e 10. Ad esempio, impostando `ParallelizationFactor` su 2, puoi disporre al massimo di 200 invocazioni Lambda simultanee per elaborare 100 partizioni di dati Kinesis (anche se nella pratica potresti vedere valori differenti per il parametro `ConcurrentExecutions`). Ciò permette di dimensionare verso l'alto il throughput di elaborazione quando il volume dei dati è volatile e l'`IteratorAge` è alta. Se si aumenta il numero di batch simultanei per shard, Lambda garantisce comunque l'ordine di elaborazione a livello di partizione-chiave.

Puoi anche utilizzare `ParallelizationFactor` con l'aggregazione Kinesis. Il comportamento dello strumento di mappatura dell'origine degli eventi dipende dall'utilizzo o meno di un [fan-out avanzato](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html):
+ **Senza fan-out avanzato**: tutti gli eventi all'interno di un evento aggregato devono avere la stessa chiave di partizione. La chiave di partizione deve inoltre corrispondere a quella dell'evento aggregato. Se gli eventi all'interno dell'evento aggregato hanno chiavi di partizione diverse, Lambda non può garantire l'elaborazione ordinata degli eventi per chiave di partizione.
+ **Con fan-out migliorato**: innanzitutto, Lambda decodifica l'evento aggregato nei suoi singoli eventi. L'evento aggregato può avere una chiave di partizione diversa dagli eventi che contiene. Tuttavia, gli eventi che non corrispondono alla chiave di partizione vengono [eliminati e persi](https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md). Lambda non elabora questi eventi e non li invia a una destinazione di errore configurata.

## Esempio di evento
<a name="services-kinesis-event-example"></a>

**Example**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
```

# Elaborare i record del flusso di dati Amazon Kinesis con Lambda
<a name="services-kinesis-create"></a>

Per elaborare i record di Amazon Kinesis Data Streams con Lambda, crea una mappatura delle sorgenti degli eventi Lambda. È possibile mappare una funzione Lambda a un iteratore standard o a un consumatore fan-out avanzato. Per ulteriori informazioni, consulta [Flussi di polling e batching](with-kinesis.md#kinesis-polling-and-batching).

## Creare una mappatura delle sorgenti degli eventi Kinesis
<a name="services-kinesis-eventsourcemapping"></a>

Per richiamare la funzione Lambda con i record dal flusso di dati, crea uno [strumento di mappatura dell'origine degli eventi](invocation-eventsourcemapping.md). È possibile creare più mappature delle origini eventi per elaborare gli stessi dati con più funzioni Lambda o per elaborare elementi da più flussi di dati con una singola funzione. Quando si elaborano elementi da più flussi, ogni batch conterrà i record di un solo shard o di un solo flusso.

È possibile configurare gli strumenti di mappatura dell'origine degli eventi per elaborare i record da un flusso in un altro Account AWS. Per ulteriori informazioni, consulta [Creazione di uno strumento di mappatura dell'origine degli eventi multi-account](#services-kinesis-eventsourcemapping-cross-account).

Prima di creare uno strumento di mappatura dell'origine degli eventi, devi autorizzare la funzione Lambda a leggere da un flusso di dati Kinesis. Lambda richiede le seguenti autorizzazioni per gestire le risorse correlate al flusso di dati Kinesis:
+ [kinesis: DescribeStream](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStream.html)
+ [cinesi: DescribeStreamSummary](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStreamSummary.html)
+ [cinesi: GetRecords](https://docs.aws.amazon.com/lambda/latest/api/API_GetRecords.html)
+ [cinesi: GetShardIterator](https://docs.aws.amazon.com/lambda/latest/api/API_GetShardIterator.html)
+ [cinesi: ListShards](https://docs.aws.amazon.com/lambda/latest/api/API_ListShards.html)
+ [cinesi: SubscribeToShard](https://docs.aws.amazon.com/lambda/latest/api/API_SubscribeToShard.html)

La politica AWS gestita [AWSLambdaKinesisExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaKinesisExecutionRole.html)include queste autorizzazioni. Aggiungi questa policy gestita alla funzione come descritto nella seguente procedura.

**Nota**  
Non è necessaria l'`kinesis:ListStreams`autorizzazione per creare e gestire le mappature delle sorgenti degli eventi per Kinesis. Tuttavia, se crei una mappatura dell'origine degli eventi nella console e non disponi di questa autorizzazione, non sarai in grado di selezionare uno stream Kinesis da un elenco a discesa e la console visualizzerà un errore. Per creare la mappatura delle sorgenti degli eventi, devi inserire manualmente l'Amazon Resource Name (ARN) del tuo stream.
Lambda effettua `kinesis:GetRecords` chiamate `kinesis:GetShardIterator` API ogni volta che tenta di ripetere le chiamate non riuscite.

------
#### [ Console di gestione AWS ]

**Per aggiungere le autorizzazioni Kinesis alla tua funzione**

1. Apri la [pagina Funzioni](https://console.aws.amazon.com/lambda/home#/functions) della console Lambda e scegli la tua funzione.

1. Nella scheda **Configurazione** scegli **Autorizzazioni**.

1. Nel riquadro **Ruolo di esecuzione**, ub **Nome del ruolo**, scegli il link al ruolo di esecuzione della tua funzione. Questo link apre la pagina del ruolo nella console IAM.

1. Nel riquadro **Policy di autorizzazioni**, seleziona **Aggiungi autorizzazioni**, quindi **Collega policy**.

1. Inserisci **AWSLambdaKinesisExecutionRole** nel campo di ricerca.

1. Seleziona la casella di controllo accanto al nome della policy, quindi scegli **Aggiungi autorizzazione**.

------
#### [ AWS CLI ]

**Per aggiungere le autorizzazioni Kinesis alla tua funzione**
+ Per aggiungere la policy `AWSLambdaKinesisExecutionRole` al ruolo di esecuzione della funzione, eseguire il comando della CLI sotto riportato:

  ```
  aws iam attach-role-policy \
  --role-name MyFunctionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
  ```

------
#### [ AWS SAM ]

**Per aggiungere le autorizzazioni Kinesis alla tua funzione**
+ Nella definizione della funzione, aggiungi la proprietà `Policies` come mostrato nell'esempio seguente:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
  ```

------

Dopo aver configurato le autorizzazioni richieste, crea lo strumento di mappatura dell'origine degli eventi.

------
#### [ Console di gestione AWS ]

**Per creare uno strumento di mappatura dell'origine degli eventi Kinesis**

1. Apri la [pagina Funzioni](https://console.aws.amazon.com/lambda/home#/functions) della console Lambda e scegli la tua funzione.

1. Nel riquadro **Panoramica della funzione**, scegli **Aggiungi trigger**.

1. In **Configurazione del trigger**, per l'origine seleziona **Kinesis**.

1. Seleziona il flusso Kinesis per il quale desideri creare lo strumento di mappatura dell'origine degli eventi e, facoltativamente, un consumer del tuo flusso.

1. (Facoltativo) Modifica la **dimensione del batch**, la **posizione iniziale** e la **finestra batch** per lo strumento di mappatura dell'origine degli eventi.

1. Scegliere **Aggiungi**.

[Quando crei la mappatura delle sorgenti degli eventi dalla console, il tuo ruolo IAM deve disporre delle autorizzazioni kinesis: e [kinesis](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreams.html):. ListStreams ListStreamConsumers](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreamConsumers.html)

------
#### [ AWS CLI ]

**Per creare uno strumento di mappatura dell'origine degli eventi Kinesis**
+ Per creare uno strumento di mappatura dell'origine degli eventi Kinesis, eseguire il comando della CLI sotto riportato. Scegli la dimensione del batch e la posizione iniziale in base al tuo caso d'uso.

  ```
  aws lambda create-event-source-mapping \
  --function-name MyFunction \
  --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
  --starting-position LATEST \
  --batch-size 100
  ```

Per specificare una finestra di batch, aggiungi l'opzione `--maximum-batching-window-in-seconds`. *Per ulteriori informazioni sull'utilizzo di questo e di altri parametri, consulta [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)la sezione Command Reference.AWS CLI *

------
#### [ AWS SAM ]

**Per creare uno strumento di mappatura dell'origine degli eventi Kinesis**
+ Nella definizione della funzione, aggiungi la proprietà `KinesisEvent` come mostrato nell'esempio seguente:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
        Events:
          KinesisEvent:
            Type: Kinesis
            Properties:
              Stream: !GetAtt MyKinesisStream.Arn
              StartingPosition: LATEST
              BatchSize: 100
  
    MyKinesisStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 1
  ```

*Per ulteriori informazioni sulla creazione di una mappatura delle sorgenti di eventi per Kinesis Data Streams in AWS SAM, consulta [Kinesis nella Developer Guide](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-kinesis.html).AWS Serverless Application Model *

------

## Posizioni di partenza di polling e flussi
<a name="services-kinesis-stream-start-pos"></a>

Tieni presente che il polling dei flussi durante la creazione e gli aggiornamenti dello strumento di mappatura dell'origine degli eventi alla fine è coerente.
+ Durante la creazione dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.
+ Durante gli aggiornamenti dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.

Questo comportamento implica che se specifichi `LATEST` come posizione iniziale del flusso, lo strumento di mappatura dell'origine degli eventi potrebbe perdere eventi durante la creazione o gli aggiornamenti. Per non perdere alcun evento, specifica la posizione iniziale del flusso come `TRIM_HORIZON` o `AT_TIMESTAMP`.

## Creazione di uno strumento di mappatura dell'origine degli eventi multi-account
<a name="services-kinesis-eventsourcemapping-cross-account"></a>

Flusso di dati Amazon Kinesis supporta le [policy basate sulle risorse](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html). Per questo motivo, puoi elaborare i dati inseriti in un flusso in uno Account AWS con una funzione Lambda in un altro account.

Per creare una mappatura dell'origine degli eventi per la tua funzione Lambda utilizzando un flusso Kinesis in un Account AWS altro, devi configurare il flusso utilizzando una policy basata sulle risorse per autorizzare la funzione Lambda a leggere gli elementi. Per informazioni su come configurare lo stream per consentire l'accesso su più account, consulta [Condivisione dell'accesso con AWS Lambda funzioni su più account nella guida](https://docs.aws.amazon.com/streams/latest/dev/resource-based-policy-examples.html#Resource-based-policy-examples-lambda) per sviluppatori di *Amazon Kinesis Streams*.

Dopo aver configurato il flusso con una policy basata sulle risorse che fornisce alla funzione Lambda le autorizzazioni richieste, crea lo strumento di mappatura dell'origine degli eventi utilizzando uno dei metodi descritti nella sezione precedente.

Se scegli di creare lo strumento di mappatura dell'origine degli eventi utilizzando la console Lambda, incolla l'ARN del tuo flusso direttamente nel campo di input. Se desideri specificare un consumer per il tuo flusso, incollando l'ARN del consumer viene compilato automaticamente il campo del flusso.

# Configurazione della risposta batch parziale con il flusso di dati Kinesis e Lambda
<a name="services-kinesis-batchfailurereporting"></a>

Quando si consumano ed elaborano i dati di streaming da un'origine eventi, per impostazione predefinita i Lambda imposta i checkpoint al numero di sequenza più alto di un batch solo quando il batch è riuscito completamente. Lambda tratta tutti gli altri risultati come un fallimento completo e riprova a elaborare il batch fino al limite di tentativi. Per consentire i successi parziali durante l'elaborazione di batch da un flusso, attivare `ReportBatchItemFailures`. Consentire successi parziali può contribuire a ridurre il numero di tentativi su un record, anche se non impedisce del tutto la possibilità di tentativi in un record riuscito.

Per attivare `ReportBatchItemFailures`, includere il valore enum**ReportBatchItemFailures** nell'[FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes)elenco. Questo elenco indica quali tipi di risposta sono abilitati per la funzione. È possibile configurare questo elenco quando si [crea](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) o si [aggiorna](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) uno strumento di mappatura dell'origine degli eventi.

**Nota**  
Anche quando il codice della funzione restituisce risposte parziali di errore in batch, queste risposte non verranno elaborate da Lambda a meno che la `ReportBatchItemFailures` funzionalità non sia esplicitamente attivata per la mappatura dell'origine degli eventi.

## Sintassi di report
<a name="streams-batchfailurereporting-syntax"></a>

Quando si configura la creazione di report sugli errori degli elementi batch, la `StreamsEventResponse` classe viene restituita con un elenco di errori degli articoli batch. È possibile utilizzare un `StreamsEventResponse` oggetto per restituire il numero di sequenza del primo record non riuscito nel batch. È inoltre possibile creare la propria classe personalizzata utilizzando la sintassi di risposta corretta. La seguente struttura JSON mostra la sintassi di risposta richiesta:

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**Nota**  
Se l'array `batchItemFailures` contiene più elementi, Lambda utilizza il record con il numero di sequenza più basso come checkpoint. Lambda quindi riprova tutti i record a partire da quel checkpoint.

## Condizioni di successo e di errore
<a name="streams-batchfailurereporting-conditions"></a>

Lambda considera un batch come un successo completo se si restituisce uno degli elementi seguenti:
+ Una `batchItemFailure` lista vuota
+ Un `batchItemFailure` elenco nullo
+ Un vuoto `EventResponse`
+ Un valore nullo `EventResponse`

Lambda considera un batch come un fallimento completo se si restituisce uno degli elementi seguenti:
+ Una stringa vuota `itemIdentifier`
+ Un valore nullo `itemIdentifier`
+ Un `itemIdentifier` con un nome chiave errato

Lambda esegue nuovi tentativi in seguito ai fallimenti secondo la strategia di tentativi impostata.

## Bisezione un batch
<a name="streams-batchfailurereporting-bisect"></a>

Se il richiamo fallisce ed `BisectBatchOnFunctionError` è attivato, il batch viene bisecato a prescindere dalle `ReportBatchItemFailures` impostazioni.

Quando si riceve una risposta di successo parziale del batch ed entrambi `BisectBatchOnFunctionError` e `ReportBatchItemFailures` sono attivati, il batch viene bisecato in corrispondenza del numero di sequenza restituito e Lambda ritenta solo i record rimanenti.

Per semplificare l'implementazione della logica di risposta batch parziale, prendi in considerazione l'utilizzo dell'[utilità Batch Processor](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) di Powertools for AWS Lambda, che gestisce automaticamente queste complessità per te.

Ecco alcuni esempi di codice di funzione che restituiscono l'elenco dei messaggi non riusciti IDs nel batch:

------
#### [ .NET ]

**SDK per .NET**  
 C'è altro su GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegration;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return new StreamsEventResponse();
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                return new StreamsEventResponse
                {
                    BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
                    {
                        new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
                    }
                };
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
        return new StreamsEventResponse();
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}

public class StreamsEventResponse
{
    [JsonPropertyName("batchItemFailures")]
    public IList<BatchItemFailure> BatchItemFailures { get; set; }
    public class BatchItemFailure
    {
        [JsonPropertyName("itemIdentifier")]
        public string ItemIdentifier { get; set; }
    }
}
```

------
#### [ Go ]

**SDK per Go V2**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) {
	batchItemFailures := []map[string]interface{}{}

	for _, record := range kinesisEvent.Records {
		curRecordSequenceNumber := ""

		// Process your record
		if /* Your record processing condition here */ {
			curRecordSequenceNumber = record.Kinesis.SequenceNumber
		}

		// Add a condition to check if the record processing failed
		if curRecordSequenceNumber != "" {
			batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
		}
	}

	kinesisBatchResponse := map[string]interface{}{
		"batchItemFailures": batchItemFailures,
	}
	return kinesisBatchResponse, nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK per Java 2.x**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
            try {
                //Process your record
                KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
                curRecordSequenceNumber = kinesisRecord.getSequenceNumber();

            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse(batchItemFailures);   
    }
}
```

------
#### [ JavaScript ]

**SDK per JavaScript (v3)**  
 C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Javascript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Segnalazione degli errori degli elementi batch di Kinesis utilizzando Lambda. TypeScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
  KinesisStreamBatchResponse,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<KinesisStreamBatchResponse> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  logger.info(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK per PHP**  
 C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $kinesisEvent = new KinesisEvent($event);
        $this->logger->info("Processing records");
        $records = $kinesisEvent->getRecords();

        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK per Python (Boto3)**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK per Ruby**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  batch_item_failures = []

  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue StandardError => err
      puts "An error occurred #{err}"
      # Since we are working with streams, we can return the failed item immediately.
      # Lambda will immediately begin to retry processing from this failed item onwards.
      return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] }
    end
  end

  puts "Successfully processed #{event['Records'].length} records."
  { batchItemFailures: batch_item_failures }
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('utf-8')
  # Placeholder for actual async work
  sleep(1)
  data
end
```

------
#### [ Rust ]

**SDK per Rust**  
 C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
    event::kinesis::KinesisEvent,
    kinesis::KinesisEventRecord,
    streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
    let mut response = KinesisEventResponse {
        batch_item_failures: vec![],
    };

    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in &event.payload.records {
        tracing::info!(
            "EventId: {}",
            record.event_id.as_deref().unwrap_or_default()
        );

        let record_processing_result = process_record(record);

        if record_processing_result.is_err() {
            response.batch_item_failures.push(KinesisBatchItemFailure {
                item_identifier: record.kinesis.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(response)
}

fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
    let record_data = std::str::from_utf8(record.kinesis.data.as_slice());

    if let Some(err) = record_data.err() {
        tracing::error!("Error: {}", err);
        return Err(Error::from(err));
    }

    let record_data = record_data.unwrap_or_default();

    // do something interesting with the data
    tracing::info!("Data: {}", record_data);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Utilizzo di Powertools per il processore AWS Lambda batch
<a name="services-kinesis-batchfailurereporting-powertools"></a>

L'utilità per il processore batch di Powertools gestisce AWS Lambda automaticamente la logica di risposta parziale in batch, riducendo la complessità dell'implementazione della segnalazione degli errori dei batch. Ecco alcuni esempi di utilizzo del processore batch:

**Python**  
Per esempi completi e istruzioni di configurazione, consultate la [documentazione del processore batch](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/).
Elaborazione dei AWS Lambda record di flusso Kinesis Data Streams con processore batch.  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import KinesisEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
Per esempi completi e istruzioni di configurazione, consulta la documentazione del [processore batch](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/).
Elaborazione dei AWS Lambda record di flusso Kinesis Data Streams con processore batch.  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { KinesisEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: KinesisEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
Per esempi completi e istruzioni di configurazione, consulta la documentazione del [processore batch](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/).
Elaborazione dei AWS Lambda record di flusso Kinesis Data Streams con processore batch.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class KinesisStreamBatchHandler implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;

    public KinesisStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withKinesisBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
        return handler.processBatch(kinesisEvent, context);
    }

    private void processMessage(KinesisEvent.KinesisEventRecord kinesisEventRecord, Context context) {
        // Process the stream record
    }
}
```

**.NET**  
Per esempi completi e istruzioni di configurazione, consulta la documentazione del [processore batch](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/).
Elaborazione dei AWS Lambda record di flusso Kinesis Data Streams con processore batch.  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class OrderEvent
{
    public string? OrderId { get; set; }
    public string? CustomerId { get; set; }
    public decimal Amount { get; set; }
    public DateTime OrderDate { get; set; }
}

internal class TypedKinesisRecordHandler : ITypedRecordHandler<OrderEvent> 
{
    public async Task<RecordHandlerResult> HandleAsync(OrderEvent orderEvent, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(orderEvent.OrderId)) 
        {
            throw new ArgumentException("Order ID is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
    {
        return TypedKinesisStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Conservare i record batch scartati per un'origine di eventi del flusso di dati Kinesis in Lambda
<a name="kinesis-on-failure-destination"></a>

La gestione degli errori per gli strumenti di mappatura dell'origine degli eventi Kinesis dipende dal fatto che l'errore si verifichi prima che la funzione venga richiamata o durante la chiamata della funzione:
+ **Prima della chiamata:** se una mappatura dell'origine degli eventi Lambda non è in grado di richiamare la funzione a causa di limitazioni o altri problemi, riprova finché i record non scadono o superano l'età massima configurata nella mappatura dell'origine dell'evento (). [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)
+ **Durante la chiamata:** se la funzione viene richiamata ma restituisce un errore, Lambda riprova fino alla scadenza dei record, al superamento dell'età massima () o al raggiungimento della quota di tentativi configurata ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)). [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts) Per gli errori di funzione, puoi anche configurare, che divide un batch non riuscito in due batch più piccoli [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError), isolando i record non validi ed evitando i timeout. La divisione dei batch non consuma la quota di tentativi.

Se le misure di gestione degli errori non riescono, Lambda elimina i record e continua l'elaborazione dei batch dal flusso. Con le impostazioni predefinite, ciò significa che un record errato può bloccare l'elaborazione sullo shard interessata per un massimo di una settimana. Per evitare questa situazione, configura la mappatura dell'origine eventi della funzione con un numero ragionevole di tentativi e un'età massima dei record che sia adatta al caso d'uso.

## Configurazione delle destinazioni per le chiamate non riuscite
<a name="kinesis-on-failure-destination-console"></a>

Per mantenere i record delle chiamate non riuscite allo strumento di mappatura dell'origine degli eventi, aggiungi una destinazione allo strumento di mappatura dell'origine degli eventi della funzione. Ogni record inviato alla destinazione è un documento JSON contenente i metadati relativi alla chiamata non riuscita. Per le destinazioni Amazon S3, Lambda invia insieme ai metadati anche l'intero record di invocazione. Puoi configurare qualsiasi argomento Amazon SNS, coda Amazon SQS, bucket Amazon S3 o Kafka come destinazione.

Con le destinazioni Amazon S3, puoi utilizzare la funzionalità [Notifiche eventi Amazon S3](https://docs.aws.amazon.com/) per ricevere notifiche quando gli oggetti vengono caricati nel bucket S3 di destinazione. Puoi anche configurare Notifiche eventi S3 per richiamare un'altra funzione Lambda per eseguire l'elaborazione automatica su batch non riusciti.

Il tuo ruolo di esecuzione deve avere le autorizzazioni per la destinazione:
+ **[Per una destinazione SQS: sqs: SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)**
+ **[Per una destinazione SNS: sns:publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)**
+ **[Per una destinazione S3: s3: e [s3](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html): PutObject ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)**
+ **[Per una destinazione Kafka: kafka-cluster: WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)**

È possibile configurare un argomento di Kafka come destinazione in caso di errore per le mappature delle sorgenti degli eventi Kafka. Quando Lambda non è in grado di elaborare i record dopo estenuanti tentativi o quando i record superano l'età massima, Lambda invia i record non riusciti all'argomento Kafka specificato per un'elaborazione successiva. Fai riferimento a [Usare un argomento di Kafka come destinazione in caso di errore](kafka-on-failure-destination.md).

[Se hai abilitato la crittografia con la tua chiave KMS per una destinazione S3, anche il ruolo di esecuzione della funzione deve avere l'autorizzazione a chiamare kms:. GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) Se la chiave KMS e la destinazione del bucket S3 si trovano in un account diverso dalla funzione Lambda e dal ruolo di esecuzione, configura la chiave KMS in modo che consideri attendibile il ruolo di esecuzione da consentire. kms: GenerateDataKey

Per configurare una destinazione in caso di errore tramite la console, completa i seguenti passaggi:

1. Aprire la pagina [Funzioni](https://console.aws.amazon.com/lambda/home#/functions) della console Lambda.

1. Scegliere una funzione.

1. In **Function overview (Panoramica delle funzioni)**, scegliere **Add destination (Aggiungi destinazione)**.

1. Per **Origine**, scegli **Chiamata allo strumento di mappatura dell'origine degli eventi**.

1. Per **Strumento di mappatura dell'origine degli eventi**, scegli un'origine dell'evento configurata per questa funzione.

1. Per **Condizione**, seleziona **In caso di errore**. Per le chiamate allo strumento di mappatura dell'origine degli eventi, questa è l'unica condizione accettata.

1. Per **Tipo di destinazione**, scegli il tipo di destinazione a cui Lambda deve inviare i record di chiamata.

1. Per **Destination (Destinazione)**, scegliere una risorsa.

1. Scegli **Save** (Salva).

Puoi anche configurare una destinazione in caso di errore utilizzando (). AWS Command Line Interface AWS CLI Ad esempio, il [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)comando seguente aggiunge una mappatura dell'origine degli eventi con una destinazione SQS in caso di errore a: `MyFunction`

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

Il [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html)comando seguente aggiorna una mappatura dell'origine degli eventi per inviare i record di chiamata non riuscita a una destinazione SNS dopo due tentativi o se i record risalgono a più di un'ora.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

Le impostazioni aggiornate sono applicate in modo asincrono e non sono riflesse nell'output fino al completamento del processo. Utilizza il comando [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) per visualizzare lo stato corrente.

Per rimuovere una destinazione, fornisci una stringa vuota come argomento del parametro `destination-config`:

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Best practice di sicurezza per destinazioni Amazon S3
<a name="kinesis-s3-destination-security"></a>

L'eliminazione di un bucket S3 configurato come destinazione senza rimuovere la destinazione dalla configurazione della funzione può creare un rischio per la sicurezza. Se un altro utente conosce il nome del bucket di destinazione, può ricreare il bucket nel proprio Account AWS. I record delle invocazioni non riuscite verranno inviati al relativo bucket, esponendo potenzialmente i dati della tua funzione.

**avvertimento**  
Per garantire che i record di invocazione della tua funzione non possano essere inviati a un bucket S3 in un altro Account AWS, aggiungi una condizione al ruolo di esecuzione della funzione che limiti le autorizzazioni ai bucket del tuo account. `s3:PutObject` 

Di seguito viene illustrato un esempio di policy IAM che limita le autorizzazioni `s3:PutObject` della funzione ai bucket presenti nell'account. Questa policy fornisce inoltre a Lambda l'autorizzazione `s3:ListBucket` necessaria per utilizzare un bucket S3 come destinazione.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

Per aggiungere una politica di autorizzazioni al ruolo di esecuzione della tua funzione utilizzando Console di gestione AWS o AWS CLI, consulta le istruzioni nelle seguenti procedure:

------
#### [ Console ]

**Per aggiungere una policy di autorizzazioni al ruolo di esecuzione di una funzione (console)**

1. Aprire la pagina [Funzioni](https://console.aws.amazon.com/lambda/home#/functions) della console Lambda.

1. Seleziona la funzione Lambda di cui si desidera modificare il ruolo di esecuzione.

1. Nella scheda **Configurazione** scegli **Autorizzazioni**.

1. Nella scheda **Ruolo di esecuzione**, seleziona il **nome del ruolo** della funzione per aprire la pagina della console IAM del ruolo.

1. Aggiungi una policy di autorizzazioni di base al ruolo completando le seguenti operazioni:

   1. Nel riquadro **Policy di autorizzazioni**, scegli **Aggiungi autorizzazioni**, poi **Crea policy in linea**.

   1. Nell'**editor delle policy**, seleziona **JSON**.

   1. Incolla la policy che desideri aggiungere nell'editor (sostituendo il codice JSON esistente), quindi scegli **Avanti**.

   1. In **Dettagli della policy**, specifica un **nome per la policy**.

   1. Scegli **Crea policy**.

------
#### [ AWS CLI ]

**Per aggiungere una policy di autorizzazioni al ruolo di esecuzione di una funzione (CLI)**

1. Crea un documento di policy JSON con le autorizzazioni richieste e salvalo in una directory locale.

1. Utilizza il comando della CLI `put-role-policy` di IAM per aggiungere le autorizzazioni per il ruolo di esecuzione di una funzione. Esegui il comando seguente dalla directory in cui hai salvato il documento di policy JSON e sostituisci il nome del ruolo, il nome della policy e il documento di policy con i tuoi valori.

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Record di invocazione Amazon SNS e Amazon SQS di esempio
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

L'esempio seguente mostra ciò che Lambda invia a un argomento SNS o una coda SQS di destinazione per una chiamata non riuscita all'origine dell'evento Kinesis. Poiché Lambda invia solo i metadati per questi tipi di destinazione, utilizza i campi `streamArn`, `shardId`, `startSequenceNumber` e `endSequenceNumber` per ottenere il record originale completo. Tutti i campi mostrati nella proprietà `KinesisBatchInfo` saranno sempre presenti.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    }
}
```

È possibile utilizzare queste informazioni per recuperare i record interessati dal flusso per la risoluzione dei problemi. I record effettivi non sono inclusi, pertanto è necessario elaborare questo record e recuperarli dal flusso prima che scadano e vadano persi.

### Record di invocazione Amazon S3 di esempio
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

L'esempio seguente mostra ciò che Lambda invia a un bucket Amazon S3 per una chiamata non riuscita all'origine eventi Kinesis. Oltre a tutti i campi dell'esempio precedente per le destinazioni SQS e SNS, il campo `payload` contiene il record di chiamata originale come stringa JSON con escape.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

L'oggetto S3 contenente il record di invocazione utilizza la seguente convenzione di denominazione:

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Implementazione dell'elaborazione di Flusso di dati Kinesis stateful in Lambda
<a name="services-kinesis-windows"></a>

Le funzioni Lambda possono eseguire applicazioni di elaborazione di flussi continui. Un flusso rappresenta dati illimitati che scorrono continuamente attraverso l'applicazione. Per analizzare le informazioni da questo input di aggiornamento continuo, è possibile associare i record inclusi utilizzando una finestra definita in termini di tempo.

Le finestre a cascata sono finestre temporali distinte che si aprono e si chiudono a intervalli regolari. Per impostazione predefinita, le invocazioni Lambda sono senza stato: non è possibile utilizzarle per l'elaborazione dei dati tra più invocazioni continue senza un database esterno. Tuttavia, con la finestra a cascata, è possibile mantenere il proprio stato tra le invocazioni. Questo stato contiene il risultato aggregato dei messaggi precedentemente elaborati per la finestra corrente. Lo stato può essere un massimo di 1 MB per shard. Se supera quella dimensione, Lambda termina la finestra in anticipo.

Ogni record in un flusso appartiene a una finestra specifica. Lambda elaborerà ogni record almeno una volta, ma non garantisce che ogni record venga elaborato una sola volta. In rari casi, come nel caso della gestione degli errori, alcuni record potrebbero essere elaborati più di una volta. La prima volta i record vengono sempre elaborati in ordine. Se i record vengono elaborati più di una volta, possono essere elaborati fuori ordine.

## Aggregazione ed elaborazione
<a name="streams-tumbling-processing"></a>

La funzione gestita dall'utente viene richiamata sia per l'aggregazione che per l'elaborazione dei risultati finali di tale aggregazione. Lambda aggrega tutti i record ricevuti nella finestra. È possibile ricevere questi record in più batch, ciascuno come richiamo separato. Ogni richiamo riceve uno stato. Pertanto, quando si utilizzano finestre a cascata, la risposta della funzione Lambda deve contenere una proprietà di `state`. Se la risposta non contiene una proprietà di `state`, Lambda la considera un'invocazione non riuscita. Per soddisfare questa condizione, la funzione può restituire un oggetto `TimeWindowEventResponse`, che presenta la seguente forma JSON:

**Example `TimeWindowEventResponse` valori**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**Nota**  
Per le funzioni Java, si consiglia di utilizzare una `Map<String, String>` per rappresentare lo stato.

Alla fine della finestra, il flag `isFinalInvokeForWindow` è impostato `true` per indicare che questo è lo stato finale e che è pronto per l'elaborazione. Dopo l'elaborazione, la finestra viene completata e il richiamo finale viene completato e quindi lo stato viene eliminato.

Al termine della finestra, Lambda utilizza l'elaborazione finale per le operazioni sui risultati dell'aggregazione. L'elaborazione finale viene richiamata in modo sincrono. Dopo il richiamo riuscito, la funzione controlla il numero di sequenza e l'elaborazione del flusso continua. Se il richiamo non ha esito positivo, la funzione Lambda sospende l'ulteriore elaborazione fino a quando non viene eseguito correttamente il richiamo.

**Example KinesisTimeWindowEvent**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1607497475.000
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
        }
    ],
    "window": {
        "start": "2020-12-09T07:04:00Z",
        "end": "2020-12-09T07:06:00Z"
    },
    "state": {
        "1": 282,
        "2": 715
    },
    "shardId": "shardId-000000000006",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Configurazione
<a name="streams-tumbling-config"></a>

È possibile configurare le finestre a cascata quando si crea o si aggiorna un mapping di origini di eventi. Per configurare una finestra rotante, specificate la finestra in secondi ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)). Il seguente comando di esempio AWS Command Line Interface (AWS CLI) crea una mappatura della sorgente degli eventi in streaming con una finestra di tumbling di 120 secondi. La funzione Lambda definita per l'aggregazione e l'elaborazione è denominata `tumbling-window-example-function`.

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda determina i limiti delle finestre a cascata in base al momento in cui i record sono stati inseriti nel flusso. Tutti i record dispongono di un timestamp approssimativo che Lambda utilizza nelle determinazioni dei limiti.

Le aggregazioni delle finestre a cascata non supportano il risharding. Quando lo shard viene terminato, Lambda considera la finestra chiusa e gli shard secondari iniziano la propria finestra in uno stato nuovo. Quando non vengono aggiunti nuovi record alla finestra corrente, Lambda attende fino a due minuti prima di presumere che la finestra sia terminata. Ciò aiuta a garantire che la funzione legga tutti i record nella finestra corrente, anche se i record vengono aggiunti a intermittenza.

Le finestre a cascata supportano completamente le policy di ripetizione dei tentativi esistenti `maxRetryAttempts` e `maxRecordAge`.

**Example Handler.py: aggregazione ed elaborazione**  
La seguente funzione Python mostra come aggregare e quindi elaborare lo stato finale:  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Parametri Lambda per gli strumenti di mappatura dell'origine degli eventi di un flusso di dati Amazon Kinesis
<a name="services-kinesis-parameters"></a>

Tutti gli strumenti di mappatura dell'origine degli eventi Lambda condividono le stesse operazioni API [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) e [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html). Tuttavia, solo alcuni dei parametri si applicano a Kinesis.


| Parametro | Obbligatorio | Predefinito | Note | 
| --- | --- | --- | --- | 
|  [BatchSize](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BatchSize)  |  N  |  100  |  Massimo: 10.000.  | 
|  [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BisectBatchOnFunctionError)  |  N  |  false  |  nessuno | 
|  [DestinationConfig](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-DestinationConfig)  |  N  | N/D |  Una destinazione coda Amazon SQS o argomento Amazon SNS per i record scartati. Per ulteriori informazioni, consulta [Configurazione delle destinazioni per le chiamate non riuscite](kinesis-on-failure-destination.md#kinesis-on-failure-destination-console).  | 
|  [Abilitazione di](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-Enabled)  |  N  |  true  |  nessuno | 
|  [EventSourceArn](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-EventSourceArn)  |  Y  | N/D |  L'ARN del flusso dei dati o di un consumer di flusso.  | 
|  [FunctionName](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionName)  |  Y  | N/D |  nessuno | 
|  [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes)  |  N  |  N/D |  Per consentire alla funzione di segnalare errori specifici in un batch, includi il valore `ReportBatchItemFailures` in `FunctionResponseTypes`. Per ulteriori informazioni, consulta [Configurazione della risposta batch parziale con il flusso di dati Kinesis e Lambda](services-kinesis-batchfailurereporting.md).  | 
|  [MaximumBatchingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumBatchingWindowInSeconds)  |  N  |  0  |  nessuno | 
|  [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)  |  N  |  -1  |  -1 sta per infinito: Lambda non scarta i record (le [impostazioni di conservazione dei dati del flusso di dati Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kinesis-extended-retention.html) sono ancora valide) Minimo: -1 Massimo: 604.800  | 
|  [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)  |  N  |  -1  |  -1 sta per infinito: i record non riusciti vengono ripetuti fino alla scadenza del record Minimo: -1 Massimo: 10.000.  | 
|  [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)  |  N  |  1  |  Maximum: 10  | 
|  [StartingPosition](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPosition)  |  Y  |  N/D |  AT\$1TIMESTAMP, TRIM\$1HORIZON o LATEST  | 
|  [StartingPositionTimestamp](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPositionTimestamp)  |  N  |  N/D |  Valido solo se StartingPosition è impostato su AT\$1TIMESTAMP. Il tempo da cui avviare la lettura, in secondi di tempo Unix.  | 
|  [TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)  |  N  |  N/D |  Minimo: 0 Massimo: 900  | 

# Utilizzo del filtro eventi con un'origine eventi Kinesis
<a name="with-kinesis-filtering"></a>

Puoi utilizzare il filtraggio degli eventi per controllare quali record di un flusso o di una coda Lambda invia alla funzione. Per informazioni generali sul funzionamento del filtraggio eventi, consulta [Controllare gli eventi che Lambda invia alla funzione](invocation-eventfiltering.md).

In questa sezione viene descritto il filtraggio degli eventi per le origini eventi Kinesis.

**Nota**  
Le mappature delle sorgenti degli eventi Kinesis supportano solo il filtraggio sulla chiave. `data`

**Topics**
+ [Nozioni di base sul filtro di eventi Kinesis](#filtering-kinesis)
+ [Filtraggio dei record aggregati Kinesis](#filtering-kinesis-efo)

## Nozioni di base sul filtro di eventi Kinesis
<a name="filtering-kinesis"></a>

Supponiamo che un produttore stia inserendo dati in formato JSON nel flusso di dati Kinesis. Un record di esempio sarebbe simile al seguente, con i dati JSON convertiti in una stringa codificata Base64 nel campo `data`.

```
{
    "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=",
        "approximateArrivalTimestamp": 1545084650.987
        },
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
```

Fintantoché i dati che il produttore inserisce nel flusso sono JSON validi, puoi utilizzare il filtraggio degli eventi per filtrare i record utilizzando la chiave `data`. Supponiamo che un produttore stia inserendo dati in formato JSON nel flusso di dati Kinesis.

```
{
    "record": 12345,
    "order": {
        "type": "buy",
        "stock": "ANYCO",
        "quantity": 1000
        }
}
```

Per filtrare solo i record in cui il tipo di ordine è "acquista", l'oggetto `FilterCriteria` dovrebbe avere la struttura seguente.

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"
        }
    ]
}
```

Per una maggiore chiarezza, ecco il valore del `Pattern` del filtro espanso in JSON semplice. 

```
{
    "data": {
        "order": {
            "type": [ "buy" ]
            }
      }
}
```

Puoi aggiungere il filtro utilizzando la console, la AWS CLI o un modello AWS SAM.

------
#### [ Console ]

Per aggiungere questo filtro utilizzando la console, segui le istruzioni riportate in [Collegamento dei criteri di filtro a una mappatura dell'origine evento (console)](invocation-eventfiltering.md#filtering-console) e inserisci la seguente stringa per i **criteri di filtraggio**.

```
{ "data" : { "order" : { "type" : [ "buy" ] } } }
```

------
#### [ AWS CLI ]

Per creare una nuova mappatura dell'origine degli eventi con questi criteri di filtraggio utilizzando l'AWS Command Line Interface (AWS CLI), esegui il comando seguente.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

Per aggiungere questi criteri di filtraggio a una mappatura dell'origine degli eventi esistente, esegui il comando seguente.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

------
#### [ AWS SAM ]

Per aggiungere questo filtro utilizzando AWS SAM, aggiungi il seguente frammento al modello YAML dell'origine degli eventi.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'
```

------

Per filtrare correttamente gli eventi da origini Kinesis, sia il campo dati sia i criteri di filtraggio per il campo dati devono essere in un formato JSON valido. Se uno dei due campi non è in un formato JSON valido, Lambda rilascia il messaggio o genera un'eccezione. La tabella seguente riepiloga il comportamento specifico: 


| Formato dei dati in entrata | Formato del modello di filtro per le proprietà di dati | Operazione risultante | 
| --- | --- | --- | 
|  JSON valido  |  JSON valido  |  Filtri Lambda in base ai criteri di filtro.  | 
|  JSON valido  |  Nessun modello di filtro per le proprietà dei dati  |  Filtri Lambda (solo sulle altre proprietà dei metadati) in base ai criteri di filtro.  | 
|  JSON valido  |  Non-JSON  |  Lambda genera un'eccezione al momento della creazione o dell'aggiornamento della mappatura dell'origine evento. Il modello di filtro per le proprietà dei dati deve essere in un formato JSON valido.  | 
|  Non-JSON  |  JSON valido  |  Lambda rilascia il registro.  | 
|  Non-JSON  |  Nessun modello di filtro per le proprietà dei dati  |  Filtri Lambda (solo sulle altre proprietà dei metadati) in base ai criteri di filtro.  | 
|  Non-JSON  |  Non-JSON  |  Lambda genera un'eccezione al momento della creazione o dell'aggiornamento della mappatura dell'origine evento. Il modello di filtro per le proprietà dei dati deve essere in un formato JSON valido.  | 

## Filtraggio dei record aggregati Kinesis
<a name="filtering-kinesis-efo"></a>

Con Kinesis, puoi aggregare più record in un unico record flusso di dati Kinesis per aumentare il throughput dei dati. Lambda può applicare criteri di filtraggio ai record aggregati solo quando si utilizza il [fan-out avanzato](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html) di Kinesis. Il filtraggio dei record aggregati con Kinesis standard non è supportato. Quando utilizzi il fan-out avanzato, configuri un consumatore Kinesis a throughput dedicato che funga da trigger per la funzione Lambda. Lambda filtra quindi i record aggregati e passa solo i record che soddisfano i criteri di filtraggio.

Per ulteriori informazioni sull'aggregazione dei record Kinesis, consulta la sezione [Aggregazione](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation) nella pagina Concetti chiave di Kinesis Producer Library (KPL). Per ulteriori informazioni sull'utilizzo di Lambda con il fan-out avanzato di Kinesis, consulta la pagina [Aumento delle prestazioni di elaborazione dei flussi in tempo reale con il fan-out avanzato dei Flussi di dati Amazon Kinesis e AWS Lambda](https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/) nel blog di calcolo AWS.

# Tutorial: Utilizzo di Lambda con Flusso di dati Kinesis
<a name="with-kinesis-example"></a>

In questo tutorial, viene creata una funzione Lambda per utilizzare gli eventi da un flusso di dati Amazon Kinesis. 

1. L'app personalizzata scrive record nel flusso.

1. AWS Lambda interroga lo stream e, quando rileva nuovi record nello stream, richiama la funzione Lambda.

1. AWS Lambda esegue la funzione Lambda assumendo il ruolo di esecuzione specificato al momento della creazione della funzione Lambda.

## Prerequisiti
<a name="with-kinesis-prepare"></a>

### Installa AWS Command Line Interface
<a name="install_aws_cli"></a>

Se non l'hai ancora installato AWS Command Line Interface, segui i passaggi indicati in [Installazione o aggiornamento della versione più recente di AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) per installarlo.

Per eseguire i comandi nel tutorial, sono necessari un terminale a riga di comando o una shell (interprete di comandi). In Linux e macOS, utilizza la shell (interprete di comandi) e il gestore pacchetti preferiti.

**Nota**  
Su Windows, alcuni comandi della CLI Bash utilizzati comunemente con Lambda (ad esempio, `zip`) non sono supportati dai terminali integrati del sistema operativo. Per ottenere una versione integrata su Windows di Ubuntu e Bash, [installa il sottosistema Windows per Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Creazione del ruolo di esecuzione
<a name="with-kinesis-example-create-iam-role"></a>

Crea il [ruolo di esecuzione](lambda-intro-execution-role.md) che autorizza la funzione ad accedere alle AWS risorse.

**Per creare un ruolo di esecuzione**

1. Apri la pagina [Ruoli](https://console.aws.amazon.com/iam/home#/roles) nella console IAM.

1. Scegliere **Crea ruolo**.

1. Creare un ruolo con le seguenti proprietà.
   + **Trusted entity **(Entità attendibile) – **AWS Lambda**.
   + **Autorizzazioni** — **AWSLambdaKinesisExecutionRole**.
   + **Nome ruolo** – **lambda-kinesis-role**.

La **AWSLambdaKinesisExecutionRole**policy dispone delle autorizzazioni necessarie alla funzione per leggere gli elementi da Kinesis e scrivere i log in CloudWatch Logs.

## Creazione della funzione
<a name="with-kinesis-example-create-function"></a>

Crea una funzione Lambda che elabora i messaggi Kinesis. Il codice funzione registra l'ID dell'evento e i dati dell'evento del record CloudWatch Kinesis in Logs.

Questo tutorial utilizza il runtime Node.js 24, ma abbiamo anche fornito codice di esempio in altri linguaggi di runtime. Per visualizzare il codice per il runtime che ti interessa, seleziona la scheda corrispondente nella casella seguente. Il JavaScript codice che utilizzerai in questo passaggio si trova nel primo esempio mostrato nella **JavaScript**scheda.

------
#### [ .NET ]

**SDK per .NET**  
 C'è altro su GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK per Go V2**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK per Java 2.x**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK per JavaScript (v3)**  
 C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda). 
Consumo di un evento Kinesis con Lambda utilizzando. JavaScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Consumo di un evento Kinesis con Lambda utilizzando. TypeScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK per PHP**  
 C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK per Python (Boto3)**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK per Ruby**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK per Rust**  
 C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**Creazione della funzione**

1. Crea una directory per il progetto, quindi passa a quella directory.

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. Copia il JavaScript codice di esempio in un nuovo file denominato`index.js`.

1. Crea un pacchetto di implementazione.

   ```
   zip function.zip index.js
   ```

1. Creare una funzione Lambda con il comando `create-function`.

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Test della funzione Lambda
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

Richiama la funzione Lambda manualmente utilizzando il comando `invoke` AWS Lambda CLI e un evento Kinesis di esempio.

**Verifica della funzione Lambda**

1. Copiare il codice JSON seguente in un file e salvarlo con nome `input.txt`. 

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. Utilizzare il comando `invoke` per inviare l'evento alla funzione.

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   L'**cli-binary-format**opzione è obbligatoria se utilizzi la versione 2. AWS CLI Per rendere questa impostazione come predefinita, esegui `aws configure set cli-binary-format raw-in-base64-out`. Per ulteriori informazioni, consulta la pagina [AWS CLI supported global command line options](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) nella *Guida per l'utente di AWS Command Line Interface versione 2*.

   La risposta viene salvata in `out.txt`.

## Creare un flusso Kinesis
<a name="with-kinesis-example-configure-event-source-create"></a>

Utilizzare il comando `create-stream ` per creare un flusso.

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

Eseguire il seguente comando `describe-stream` per ottenere il flusso ARN.

```
aws kinesis describe-stream --stream-name lambda-stream
```

Verrà visualizzato l’output seguente:

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

Nella fase successiva utilizzare l'ARN del flusso per associare il flusso alla funzione Lambda.

## Aggiungi una fonte di eventi in AWS Lambda
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

Eseguire il seguente comando AWS CLI `add-event-source`.

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

Annotare l'ID della mappatura per utilizzarlo in futuro. Mediante l'esecuzione del comando `list-event-source-mappings` è possibile ottenere un elenco di mappature delle origini eventi.

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

Nella risposta è possibile verificare che il valore dello stato sia `enabled` (attivato). Le mappature dell'origine eventi possono essere disabilitate per sospendere il polling temporaneamente senza perdere alcuni record.

## Eseguire il test della configurazione
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

Per testare la mappatura dell'origine eventi, aggiungere i record dell'evento al flusso Kinesis. Il valore `--data` è una stringa che il CLI codifica in base64 prima di inviarla a Kinesis. È possibile eseguire lo stesso comando più volte per aggiungere più record al flusso.

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

Lambda usa il ruolo di esecuzione per leggere i record dal flusso. Quindi richiama la funzione Lambda, passando ai batch dei record. La funzione decodifica i dati di ogni record e li registra, inviando l'output a CloudWatch Logs. Visualizza i log nella [console CloudWatch ](https://console.aws.amazon.com/cloudwatch).

## Pulizia delle risorse
<a name="cleanup"></a>

Ora è possibile eliminare le risorse create per questo tutorial, a meno che non si voglia conservarle. Eliminando AWS le risorse che non utilizzi più, eviti addebiti inutili a tuo carico. Account AWS

**Come eliminare il ruolo di esecuzione**

1. Aprire la pagina [Ruoli](https://console.aws.amazon.com/iam/home#/roles) della console IAM.

1. Selezionare il ruolo di esecuzione creato.

1. Scegliere **Elimina**.

1. Inserisci il nome del ruolo nel campo di immissione testo e seleziona **Delete** (Elimina).

**Per eliminare la funzione Lambda**

1. Aprire la pagina [Functions (Funzioni)](https://console.aws.amazon.com/lambda/home#/functions) della console Lambda.

1. Selezionare la funzione creata.

1. Scegliere **Operazioni**, **Elimina**.

1. Digita **confirm** nel campo di immissione testo e scegli **Delete** (Elimina).

**Per eliminare il flusso Kinesis**

1. [Accedi Console di gestione AWS e apri la console Kinesis all'indirizzo https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Selezionare il flusso creato.

1. Scegli **Operazioni** > **Elimina**.

1. Inserisci **delete** nel campo di immissione del testo.

1. Scegli **Delete** (Elimina).