

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Usare un argomento di Kafka come destinazione in caso di errore
<a name="kafka-on-failure-destination"></a>

È possibile configurare un argomento di Kafka come destinazione in caso di errore per le mappature delle sorgenti degli eventi Kafka. Quando Lambda non è in grado di elaborare i record dopo estenuanti tentativi o quando i record superano l'età massima, Lambda invia i record non riusciti all'argomento Kafka specificato per un'elaborazione successiva. Quando configuri sia [tentativi infiniti](kafka-retry-configurations.md) che una destinazione in caso di errore, Lambda applica automaticamente un massimo di 10 tentativi.

## Come funziona una destinazione Kafka in caso di errore
<a name="kafka-ofd-overview"></a>

Quando si configura un argomento di Kafka come destinazione in caso di errore, Lambda funge da produttore Kafka e scrive i record non riusciti nell'argomento di destinazione. Questo crea un pattern DLT (Dead Letter Topic) all'interno dell'infrastruttura Kafka.
+ **Stesso requisito del cluster**: l'argomento di destinazione deve esistere nello stesso cluster Kafka degli argomenti di origine.
+ **Contenuto effettivo dei record**: le destinazioni Kafka ricevono i record effettivamente non riusciti insieme ai metadati di errore.
+ **Prevenzione della ricorsione**: Lambda previene i loop infiniti bloccando le configurazioni in cui gli argomenti di origine e destinazione coincidono.

## Configurazione di una destinazione Kafka in caso di errore
<a name="kafka-ofd-configure"></a>

È possibile configurare un argomento di Kafka come destinazione in caso di errore durante la creazione o l'aggiornamento di una mappatura dell'origine degli eventi Kafka.

### Configurazione di una destinazione Kafka (console)
<a name="kafka-ofd-console"></a>

**Configurare un argomento di Kafka come destinazione in caso di errore (console)**

1. Aprire la pagina [Funzioni](https://console.aws.amazon.com/lambda/home#/functions) della console Lambda.

1. Scegliete il nome della funzione.

1. Esegui una delle seguenti operazioni:
   + Per aggiungere un nuovo trigger Kafka, in **Panoramica delle funzioni**, scegli **Aggiungi** trigger.
   + **Per modificare un trigger Kafka esistente, scegli il trigger, quindi scegli Modifica.**

1. **In **Impostazioni aggiuntive**, per **Destinazione in caso di errore**, scegli l'argomento Kafka.**

1. Per **Nome argomento**, inserisci il nome dell'argomento Kafka a cui desideri inviare i record non riusciti.

1. **Scegli **Aggiungi o Salva**.**

### Configurazione di una destinazione Kafka ()AWS CLI
<a name="kafka-ofd-cli"></a>

Utilizzate il `kafka://` prefisso per specificare un argomento di Kafka come destinazione in caso di errore.

#### Creazione di una mappatura della sorgente degli eventi con la destinazione Kafka
<a name="kafka-ofd-cli-create"></a>

L'esempio seguente crea una mappatura dell'origine degli eventi Amazon MSK con un argomento Kafka come destinazione in caso di errore:

```
aws lambda create-event-source-mapping \
  --function-name my-kafka-function \
  --topics AWSKafkaTopic \
  --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \
  --starting-position LATEST \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \
  --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
```

Per Kafka autogestito, usa la stessa sintassi:

```
aws lambda create-event-source-mapping \
  --function-name my-kafka-function \
  --topics AWSKafkaTopic \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \
  --starting-position LATEST \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \
  --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
```

#### Aggiornamento di una destinazione Kafka
<a name="kafka-ofd-cli-update"></a>

Usa il `update-event-source-mapping` comando per aggiungere o modificare una destinazione Kafka:

```
aws lambda update-event-source-mapping \
  --uuid 12345678-1234-1234-1234-123456789012 \
  --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
```

## Formato di registrazione per una destinazione Kafka
<a name="kafka-ofd-record-format"></a>

Quando Lambda invia record non riusciti a un argomento di Kafka, ogni messaggio contiene sia metadati sull'errore che il contenuto effettivo del record.

### Metadati di errore
<a name="kafka-ofd-metadata"></a>

I metadati includono informazioni sul motivo per cui il record ha avuto esito negativo e dettagli sul batch originale:

```
{
  "requestContext": {
    "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e",
    "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST",
    "condition": "RetriesExhausted",
    "approximateInvokeCount": 3
  },
  "responseContext": {
    "statusCode": 200,
    "executedVersion": "$LATEST",
    "functionError": "Unhandled"
  },
  "version": "1.0",
  "timestamp": "2019-11-14T18:16:05.568Z",
  "KafkaBatchInfo": {
    "batchSize": 1,
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
    "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
    "payloadSize": 1162,
    "recordInfo": {
      "offset": "49601189658422359378836298521827638475320189012309704722",
      "timestamp": "2019-11-14T18:16:04.835Z"
    }
  },
  "payload": {
    "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
    "records": {
      "my-topic-0": [
        {
          "headers": [],
          "key": "dGVzdC1rZXk=",
          "offset": 100,
          "partition": 0,
          "timestamp": 1749116692330,
          "timestampType": "CREATE_TIME",
          "topic": "my-topic",
          "value": "dGVzdC12YWx1ZQ=="
        }
      ]
    }
  }
}
```

### Comportamento delle chiavi di partizione
<a name="kafka-ofd-partitioning"></a>

Lambda utilizza la stessa chiave di partizione del record originale per la produzione sull'argomento di destinazione. Se il record originale non aveva alcuna chiave, Lambda utilizza il partizionamento round-robin predefinito di Kafka su tutte le partizioni disponibili nell'argomento di destinazione.

## Requisiti e limitazioni
<a name="kafka-ofd-requirements"></a>
+ È **richiesta la modalità di provisioning**: una destinazione Kafka in caso di errore è disponibile solo per le mappature delle sorgenti degli eventi con la modalità provisioning abilitata.
+ **Solo stesso cluster**: l'argomento di destinazione deve esistere nello stesso cluster Kafka degli argomenti di origine.
+ **Autorizzazioni per gli argomenti**: la mappatura dei sorgenti degli eventi deve disporre delle autorizzazioni di scrittura sull'argomento di destinazione. Esempio:

  ```
  {
      "Version": "2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "ClusterPermissions",
              "Effect": "Allow",
              "Action": [
                  "kafka-cluster:Connect",
                  "kafka-cluster:DescribeCluster",
                  "kafka-cluster:DescribeTopic",
                  "kafka-cluster:WriteData",
                  "kafka-cluster:ReadData"
              ],
              "Resource": [
                  "arn:aws:kafka:*:*:cluster/*"
              ]
          },
          {
              "Sid": "TopicPermissions",
              "Effect": "Allow",
              "Action": [
                  "kafka-cluster:DescribeTopic",
                  "kafka-cluster:WriteData",
                  "kafka-cluster:ReadData"
              ],
              "Resource": [
                  "arn:aws:kafka:*:*:topic/*/*"
              ]
          },
          {
              "Effect": "Allow",
              "Action": [
                  "kafka:DescribeCluster",
                  "kafka:GetBootstrapBrokers",
                  "kafka:Produce"
              ],
              "Resource": "arn:aws:kafka:*:*:cluster/*"
          },
          {
              "Effect": "Allow",
              "Action": [
                  "ec2:CreateNetworkInterface",
                  "ec2:DescribeNetworkInterfaces",
                  "ec2:DeleteNetworkInterface",
                  "ec2:DescribeSubnets",
                  "ec2:DescribeSecurityGroups"
              ],
              "Resource": "*"
          }
      ]
  }
  ```
+ **Nessuna ricorsione**: il nome dell'argomento di destinazione non può essere lo stesso dei nomi degli argomenti di origine.