

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Sviluppa i produttori utilizzando l'API Amazon Kinesis Data Streams con AWS SDK per Java
<a name="developing-producers-with-sdk"></a>

Puoi sviluppare produttori utilizzando l'API Amazon Kinesis Data Streams AWS con l'SDK for Java. Se sei nuovo per il flusso di dati Kinesis, ti consigliamo di familiarizzare prima con i concetti chiave e la terminologia introdotti in [Cos'è Amazon Kinesis Data Streams?](introduction.md) e [Utilizza il AWS CLI per eseguire operazioni di Amazon Kinesis Data Streams](getting-started.md).

Questi esempi parlano dell'[API del flusso di dati Kinesis](https://docs.aws.amazon.com/kinesis/latest/APIReference/) e utilizzano l'[SDK AWS per Java](https://aws.amazon.com/sdk-for-java/) per aggiungere (inserire) dati a un flusso. Tuttavia, per la maggior parte dei casi d'uso, è preferibile fare riferimento alla libreria KPL del flusso di dati Kinesis. Per ulteriori informazioni, consulta [Sviluppa produttori utilizzando Amazon Kinesis Producer Library (KPL)](developing-producers-with-kpl.md).

Il codice di esempio Java in questo capitolo illustra come eseguire le operazioni API del flusso di dati Kinesis di base ed è suddiviso logicamente in tipo di operazioni. Questi esempi non rappresentano codici pronti per la produzione, poiché non eseguono un controllo per tutte le possibili eccezioni o spiegano tutte le possibili considerazioni relative alle prestazioni e alla sicurezza. Inoltre, è possibile chiamare l'[API di SDK](https://docs.aws.amazon.com/kinesis/latest/APIReference/) utilizzando altri linguaggi di programmazione. Per ulteriori informazioni su tutto ciò che è disponibile AWS SDKs, consulta [Inizia a sviluppare con Amazon Web Services](https://aws.amazon.com/developers/getting-started/).

Ogni attività ha dei requisiti preliminari; ad esempio, non è possibile aggiungere dati a un flusso se non si è creato un flusso, che a sua volta presuppone la creazione di un client. Per ulteriori informazioni, consulta [Crea e gestisci flussi di dati Kinesis](working-with-streams.md).

**Topics**
+ [Aggiungi dati a uno stream](#kinesis-using-sdk-java-add-data-to-stream)
+ [Interagisci con i dati utilizzando lo AWS Glue Schema Registry](kinesis-integration-glue-schema-registry.md)

## Aggiungi dati a uno stream
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

Una volta che il flusso è stato creato, è possibile aggiungere dati sotto forma di record. Un record è una struttura di dati che contiene i dati da elaborare sotto forma di un blob di dati. Una volta che i dati sono stati archiviati nel record, il flusso di dati Kinesis non li ispeziona, interpreta o modifica in alcun modo. Ogni record, inoltre, è associato a un numero di sequenza e a una chiave di partizione.

Sono disponibili due diverse operazioni nell'API del flusso di dati Kinesis che consentono di aggiungere dati a un flusso, [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) e [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html). L'operazione `PutRecords` invia più record al flusso in ogni richiesta HTTP e l'operazione unica `PutRecord` invia i record al flusso uno alla volta (ogni record necessita di una richiesta HTTP separata). È preferibile utilizzare l'operazione `PutRecords` per la maggior parte delle applicazioni perché questa raggiunge un maggiore throughput per producer dati. Per ulteriori informazioni su ciascuna di queste operazioni, vedere le sottosezioni separate di seguito.

**Topics**
+ [Aggiungi più record con PutRecords](#kinesis-using-sdk-java-putrecords)
+ [Aggiungi un singolo record con PutRecord](#kinesis-using-sdk-java-putrecord)

È necessario tenere sempre a mente che mentre l'applicazione origine aggiunge dati al flusso utilizzando l'API del flusso di dati Kinesis è probabile che ci siano una o più applicazioni consumer che elaborano simultaneamente i dati dal flusso. Per informazioni su come i consumer ottengono i dati utilizzando l'API del flusso di dati Kinesis, consulta [Ottieni dati da un flusso](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data).

**Importante**  
[Modifica il periodo di conservazione dei dati](kinesis-extended-retention.md)

### Aggiungi più record con PutRecords
<a name="kinesis-using-sdk-java-putrecords"></a>

L'operazione [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) invia più record a SDK in una singola richiesta. Utilizzando `PutRecords`, i producer possono ottenere maggiori livelli di velocità di trasmissione effettiva durante l'invio di dati al loro flusso di dati Kinesis. Ogni richiesta `PutRecords` può supportare fino a 500 record. Ciascun record nella richiesta può avere una dimensione massima pari a 1 MB, con un limite a 5 MB, per l'intera richiesta, incluse le chiavi di partizione. Come con l'operazione singola `PutRecord` descritta di seguito, `PutRecords` utilizza numeri di sequenza e chiavi di partizione. Tuttavia, il parametro `PutRecord` `SequenceNumberForOrdering` non è incluso in una chiamata `PutRecords`. L'operazione `PutRecords` tenta di elaborare tutti i record secondo l'ordine naturale della richiesta. 

Ogni record di dati dispone di un numero di sequenza univoco. Il numero di sequenza viene assegnato dal flusso di dati Kinesis dopo aver chiamato `client.putRecords` per aggiungere i record di dati al flusso. I numeri di sequenza per la stessa chiave di partizione di solito aumentano nel tempo; più è lungo il periodo di tempo tra le richieste `PutRecords`, più aumentano i numeri di sequenza.

**Nota**  
i numeri di sequenza non possono essere utilizzati come indici per set di dati all'interno dello stesso flusso. Per separare i set di dati logicamente, utilizza le chiavi di partizione o crea un flusso separato per ogni set di dati.

Una richiesta `PutRecords` può includere record con diverse chiavi di partizione. L'ambito della richiesta è un flusso; ogni richiesta può includere qualsiasi combinazione di chiavi di partizione e record fino al raggiungimento dei limiti della richiesta. Le richieste effettuate con molte diverse chiavi di partizione a flussi con diversi shard sono in genere più veloci delle richieste con un piccolo numero di chiavi di partizione su un esiguo numero di shard. Il numero di chiavi di partizione deve essere molto più grande del numero di shard per ridurre la latenza e massimizzare il throughput.

#### PutRecords esempio
<a name="kinesis-using-sdk-java-putrecords-example"></a>

Il codice seguente crea 100 record di dati con chiavi di partizione sequenziali e li mette in un flusso chiamato `DataStream`. 

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

La risposta `PutRecords` include una matrice di risposta `Records`. Ogni record nella matrice di risposta è direttamente correlata a un record nella matrice di richiesta in base all'ordine naturale, dall'alto al basso della richiesta e della risposta. La matrice di risposta `Records` include sempre lo stesso numero di record della matrice di richiesta.

#### Gestisci i guasti durante l'utilizzo PutRecords
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

Per impostazione predefinita, l'errore nei singoli record all'interno di una richiesta non blocca l'elaborazione dei record successivi in una richiesta `PutRecords`. Ciò significa che una matrice `Records` di risposta include record elaborati e non. È necessario rilevare i record non elaborati correttamente e includerli nella chiamata successiva. 

I record elaborati correttamente includono valori `SequenceNumber` e `ShardID`, mentre quelli non elaborati correttamente includono valori `ErrorCode` e `ErrorMessage`. Il parametro `ErrorCode` riflette il tipo di errore e può coincidere con uno dei seguenti valori: `ProvisionedThroughputExceededException` o `InternalFailure`. `ErrorMessage` offre informazioni più dettagliate sull'eccezione `ProvisionedThroughputExceededException`, incluso l'ID dell'account, il nome del flusso e l'ID dello shard del record oggetto di throttling. L'esempio seguente ha tre record in una richiesta `PutRecords`. Il secondo record ha generato un errore che si riflette nella risposta. 

**Example PutRecords Sintassi della richiesta**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords Sintassi di risposta**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

I record non elaborati correttamente possono essere inclusi nelle richieste `PutRecords` successive. In primo luogo, controlla il parametro `FailedRecordCount` in `putRecordsResult` per confermare la presenza di record non elaborati. In questo caso, ogni `putRecordsEntry` che ha un `ErrorCode` che non è `null` deve essere aggiunta a una richiesta successiva. Per un esempio di questo tipo di gestore, fai riferimento al seguente codice.

**Example PutRecords gestore degli errori**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### Aggiungi un singolo record con PutRecord
<a name="kinesis-using-sdk-java-putrecord"></a>

Ogni chiamata a [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) opera su un singolo record. È preferibile usare l'operazione `PutRecords` descritta in [Aggiungi più record con PutRecords](#kinesis-using-sdk-java-putrecords) a meno che l'applicazione in uso non abbia specificamente bisogno di inviare sempre record singoli per ogni richiesta, o nel caso in cui per altre ragioni l'operazione `PutRecords` non può essere utilizzata.

Ogni record di dati dispone di un numero di sequenza univoco. Il numero di sequenza viene assegnato dal flusso di dati Kinesis dopo aver chiamato `client.putRecord` per aggiungere i record di dati al flusso. I numeri di sequenza per la stessa chiave di partizione di solito aumentano nel tempo; più è lungo il periodo di tempo tra le richieste `PutRecord`, più aumentano i numeri di sequenza.

 Quando le immissioni si verificano in rapida successione, non è garantito che i numeri di sequenza restituiti aumentino perché le operazioni di introduzione appaiono sostanzialmente come simultanee al flusso di dati Kinesis. Per garantire un aumento rigoroso dei numeri di sequenza per la stessa chiave di partizione, utilizzare il parametro `SequenceNumberForOrdering`, come mostrato nel codice di esempio [PutRecord esempio](#kinesis-using-sdk-java-putrecord-example). 

 Che venga utilizzato o meno il `SequenceNumberForOrdering`, i record che il flusso di dati Kinesis riceve tramite una chiamata `GetRecords` sono rigorosamente ordinati in base al numero di sequenza. 

**Nota**  
i numeri di sequenza non possono essere utilizzati come indici per set di dati all'interno dello stesso flusso. Per separare i set di dati logicamente, utilizza le chiavi di partizione o crea un flusso separato per ogni set di dati.

Per raggruppare i dati nel flusso viene utilizzata una chiave di partizione. Un record di dati viene assegnato a un shard all'interno del flusso in base alla sua chiave di partizione. Nello specifico, il flusso di dati Kinesis utilizza la chiave di partizione come input a una funzione hash che mappi la chiave di partizione (e i dati associati) a una determinata partizione.

 Come conseguenza di questo meccanismo di hashing, tutti i record di dati con la stessa chiave di partizione vengono mappati allo stesso shard all'interno del flusso. Tuttavia, se il numero di chiavi di partizione supera il numero di shard, alcuni shard necessariamente conterranno record con chiavi di partizione diverse. Dal punto di vista di progettazione, per assicurare che tutti gli shard siano ben utilizzati, il numero di shard (specificato dal metodo `setShardCount` di `CreateStreamRequest`) deve essere notevolmente inferiore al numero di chiavi di partizione univoche e la quantità di flusso di dati verso un'unica chiave di partizione deve essere notevolmente inferiore alla capacità dello shard. 

#### PutRecord esempio
<a name="kinesis-using-sdk-java-putrecord-example"></a>

Il codice seguente crea dieci record di dati, distribuiti su due chiavi di partizione, e li mette in un flusso chiamato `myStreamName`.

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

Il codice di esempio precedente usa `setSequenceNumberForOrdering` per garantire un ordinamento in aumento rigoroso all'interno di ciascuna chiave di partizione. Per usare questo parametro efficacemente, impostare il `SequenceNumberForOrdering` del record corrente (record *n*) sul numero di sequenza del record precedente (record *n-1*). Per ottenere il numero di sequenza di un record che è stato aggiunto al flusso, chiamare `getSequenceNumber` sul risultato di `putRecord`.

Il parametro `SequenceNumberForOrdering` garantisce garantisce un numero di sequenza strettamente crescente per la stessa chiave di partizione. `SequenceNumberForOrdering` non fornisce ordinazione di record su più chiavi di partizione. 

# Interagisci con i dati utilizzando lo AWS Glue Schema Registry
<a name="kinesis-integration-glue-schema-registry"></a>

Puoi integrare i tuoi flussi di dati Kinesis con lo Schema Registry. AWS Glue Lo AWS Glue Schema Registry ti consente di scoprire, controllare ed evolvere centralmente gli schemi, garantendo al contempo che i dati prodotti siano convalidati continuamente da uno schema registrato. Uno schema definisce la struttura e il formato di un registro di dati. Uno schema è una specifica con versioni per la pubblicazione, il consumo o l'archiviazione dei dati in modo affidabile. Lo AWS Glue Schema Registry consente di migliorare la qualità e la governance end-to-end dei dati all'interno delle applicazioni di streaming. Per ulteriori informazioni, consulta [Registro degli schemi di AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uno dei modi per configurare questa integrazione è tramite `PutRecord` Kinesis APIs Data Streams disponibile AWS in Java SDK. `PutRecords` 

Per istruzioni dettagliate su come configurare l'integrazione di Kinesis Data Streams con il registro dello schema PutRecords utilizzando Kinesis Data Streams, consulta PutRecord la sezione «Interazione con i dati utilizzando APIs Kinesis Data Streams» [in Caso d'uso: integrazione di Amazon](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) Kinesis Data APIs Streams con il registro dello schema Glue. AWS 