

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Sviluppa i consumatori con AWS SDK per Java
<a name="develop-consumers-sdk"></a>

 Puoi sviluppare consumatori personalizzati utilizzando Amazon Kinesis APIs Data Streams. Questa sezione descrive l'utilizzo di Kinesis APIs Data AWS SDK per Java Streams con.

**Importante**  
Il metodo consigliato per lo sviluppo di consumer del flusso di dati Kinesis personalizzati con condivisione integrale consiste nell'utilizzare la Kinesis Client Library (KCL). KCL ti aiuta a consumare ed elaborare i dati da un flusso di dati Kinesis occupandoti di molte delle attività complesse associate al calcolo distribuito. Per ulteriori informazioni, consulta [Sviluppa i consumatori con KCL in Java](develop-kcl-consumers-java.md).

**Topics**
+ [

# Sviluppa consumatori con produttività condivisa con AWS SDK per Java
](developing-consumers-with-sdk.md)
+ [

# Sviluppa una maggiore fidelizzazione dei consumatori con il AWS SDK per Java
](building-enhanced-consumers-api.md)
+ [

# Interagisci con i dati utilizzando lo Schema Registry AWS Glue
](building-enhanced-consumers-glue-schema-registry.md)

# Sviluppa consumatori con produttività condivisa con AWS SDK per Java
<a name="developing-consumers-with-sdk"></a>

Uno dei metodi per sviluppare utenti Kinesis Data Streams personalizzati con condivisione integrale consiste nell'utilizzare Amazon APIs Kinesis Data Streams con. AWS SDK per Java Questa sezione descrive l'utilizzo di Kinesis APIs Data AWS SDK per Java Streams con. È possibile chiamare Kinesis APIs Data Streams utilizzando altri linguaggi di programmazione diversi. Per ulteriori informazioni su tutto ciò che è disponibile AWS SDKs, consulta [Inizia a sviluppare con Amazon Web Services](https://aws.amazon.com/developers/getting-started/). 

Il codice di esempio Java in questa sezione mostra come eseguire le operazioni di base dell'API Kinesis Data Streams ed è suddiviso logicamente per tipo di operazione. Questi esempi non rappresentano il codice pronto per la produzione. Non verificano la presenza di eccezioni o di account per tutte le possibili considerazioni di sicurezza o di prestazione. 

**Topics**
+ [

## Ottieni dati da un flusso
](#kinesis-using-sdk-java-get-data)
+ [

## Usa gli iteratori shard
](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [

## Usa GetRecords
](#kinesis-using-sdk-java-get-data-getrecords)
+ [

## Adattarsi a un reshard
](#kinesis-using-sdk-java-get-data-reshard)

## Ottieni dati da un flusso
<a name="kinesis-using-sdk-java-get-data"></a>

I Kinesis APIs Data Streams `getShardIterator` includono i metodi e che `getRecords` è possibile richiamare per recuperare i record da un flusso di dati. Si tratta di un modello pull, dove il codice disegna i record di dati direttamente dalle partizioni del flusso di dati.

**Importante**  
È consigliabile utilizzare il supporto del processore di record fornito dalla KCL per recuperare i record dati flussi di dati. Si tratta di un modello push, dove è possibile implementare il codice che elabora i dati. La KCL recupera i record di dati dal flusso di dati e li fornisce al tuo codice di applicazione. Inoltre, la KCL fornisce le funzionalità di failover, ripristino e bilanciamento del carico. Per ulteriori informazioni, consulta [Sviluppo di consumer personalizzati con velocità di trasmissione effettiva condivisa tramite KCL](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html).

Tuttavia, in alcuni casi potresti preferire utilizzare Kinesis APIs Data Streams. Ad esempio, per implementare strumenti personalizzati per il monitoraggio o il debug dei tuoi flussi di dati.

**Importante**  
Il flusso di dati Kinesis supporta le modifiche al periodo di conservazione dei record di dati del tuo flusso di dati. Per ulteriori informazioni, consulta [Modifica il periodo di conservazione dei dati](kinesis-extended-retention.md).

## Usa gli iteratori shard
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

Puoi recuperare i record dal flusso per shard. Per ogni shard e per ogni batch di record recuperati da quello shard, è necessario ottenere un *iteratore shard*. L'iteratore shard viene utilizzato nell'oggetto `getRecordsRequest` per specificare lo shard da cui devono essere recuperati i dati. Il tipo associato all'iteratore shard determina il punto nello shard da cui devono essere recuperati i record (vedi più avanti in questa sezione per ulteriori dettagli). Prima di poter lavorare con lo shard iterator, è necessario recuperare lo shard. Per ulteriori informazioni, consulta [Elenca i frammenti](kinesis-using-sdk-java-list-shards.md).

Ottieni l'iteratore shard iniziale utilizzando il metodo `getShardIterator`. Ottieni iteratori di shard per ulteriori batch di record utilizzando il metodo `getNextShardIterator` dell'oggetto `getRecordsResult` restituito dal metodo `getRecords`. Un iteratore shard è valido per 5 minuti. Se utilizzi un iteratore shard mentre è valido, puoi ottenerne uno nuovo. Ogni iteratore shard rimane valido per 5 minuti, anche dopo il suo utilizzo.

Per ottenere l'iteratore shard iniziale, crea un'istanza `GetShardIteratorRequest` e passala al metodo `getShardIterator`. Per configurare la richiesta, specifica il flusso e l'ID dello shard. Per informazioni su come ottenere gli stream del tuo account, consulta. AWS [Visualizzazione dell'elenco di flussi](kinesis-using-sdk-java-list-streams.md) Per informazioni su come ottenere gli shard in un flusso, consulta [Elenca i frammenti](kinesis-using-sdk-java-list-shards.md).

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

Questo codice di esempio specifica `TRIM_HORIZON` come il tipo di iteratore che si utilizza per ottenere l'iteratore shard iniziale. Questo tipo di iterazione significa che i record devono essere restituiti a partire dal primo record aggiunto alla partizione anziché a partire dal record aggiunto più di recente, noto anche come *estremità*. I possibili tipi di iteratore sono i seguenti:
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

Per ulteriori informazioni, consulta [ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).

Per alcuni tipi di iteratore è necessario specificare un numero di sequenza in aggiunta al tipo, ad esempio:

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

Dopo aver ottenuto un record utilizzando `getRecords`, è possibile ottenere il numero di sequenza per il record chiamando il metodo `getSequenceNumber` del record. 

```
record.getSequenceNumber()
```

Inoltre, il codice che aggiunge record per il flusso di dati è in grado di ottenere il numero di sequenza per un record aggiunto chiamando `getSequenceNumber` sul risultato di `putRecord`. 

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

È possibile utilizzare i numeri di sequenza per garantire che l'ordine dei record sia rigorosamente ascendente. Per ulteriori informazioni, consulta il codice di esempio in [PutRecord esempio](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example).

## Usa GetRecords
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

Dopo aver ottenuto l'iteratore shard, crea un'istanza di un oggetto `GetRecordsRequest`. Specifica l'iteratore per la richiesta utilizzando il metodo `setShardIterator`. 

Facoltativamente, puoi anche impostare il numero di record da recuperare utilizzando il metodo `setLimit`. Il numero di record restituiti da `getRecords` è sempre pari o inferiori a questo limite. Se non si specifica questo limite, `getRecords` restituisce 10 MB di record recuperati. Il seguente codice di esempio imposta questo limite a 25 record.

Se non viene restituito alcun record, ciò significa che non sono attualmente disponibili record di dati da questo shard al numero di sequenza a cui fa riferimento l'iteratore shard. In questo caso la tua applicazione deve attendere per un periodo di tempo appropriato per le origini dati per il flusso. Di seguito, cerca di ottenere di nuovo dati dallo shard utilizzando l'iteratore shard restituito dalla chiamata precedente a `getRecords`. 

Passa la `getRecordsRequest` al metodo `getRecords` e acquisisci il valore restituito come un oggetto `getRecordsResult`. Per ottenere i record di dati, chiama il metodo `getRecords` nell'oggetto `getRecordsResult`. 

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

Per prepararti a un'altra chiamata a `getRecords`, ottieni l'iteratore shard successivo da `getRecordsResult`. 

```
shardIterator = getRecordsResult.getNextShardIterator();
```

Per ottenere migliori risultati, sospendi l'attività per almeno 1 secondo (1.000 millisecondi) tra le chiamate a `getRecords` per evitare di superare il limite di frequenza di `getRecords`. 

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

In genere, si deve effettuare la chiamata `getRecords` in un loop, anche quando si esegue il recupero di un singolo record in uno scenario di test. Una singola chiamata a `getRecords` può restituire un elenco di record vuoto, anche quando lo shard contiene più record in numeri di sequenza successivi. In questo caso, il `NextShardIterator` restituito con l'elenco vuoto fa riferimento a un numero di sequenza successivo nello shard e, in conclusione, le chiamate successive a `getRecords` restituiscono i record. L'esempio seguente dimostra l'uso di un loop.

**Esempio: getRecords**  
Il seguente codice di esempio riflette i suggerimenti in merito a `getRecords` in questa sezione, tra cui l'esecuzione di chiamate in un loop.

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

Se si sta utilizzando la Kinesis Client Library, potrebbe effettuare più chiamate prima di restituire i dati. Questo comportamento è pianificato e non indica un problema con la KCL o con i tuoi dati.

## Adattarsi a un reshard
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 Se `getRecordsResult.getNextShardIterator` restituisce `null`, indica che si è verificata una divisione o un'unione della partizione che ha interessato questa partizione. Questa partizione si trova ora nello stato `CLOSED` e hai letto tutti i record di dati disponibili da questa partizione. 

 In questo scenario, è possibile utilizzare `getRecordsResult.childShards` per conoscere le nuove partizioni secondarie della partizione in fase di elaborazione che sono state create dalla divisione o dall'unione. Per ulteriori informazioni, consulta [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

 Nel caso di un frazionamento, i due nuovi shard hanno entrambi un `parentShardId` identico all'ID dello shard che si stava elaborando in precedenza. Il valore di `adjacentParentShardId` per entrambi gli shard è `null`. 

 Nel caso di una fusione, il singolo nuovo shard creato mediante la fusione ha un `parentShardId` identico all'ID di uno degli shard principali e un `adjacentParentShardId` identico all'ID dell'altro shard principale. La tua applicazione ha già letto tutti i dati provenienti da uno di questi shard. Questo è lo shard per cui `getRecordsResult.getNextShardIterator` ha restituito `null`. Se l'ordine dei dati è importante per la tua applicazione, assicurati che quest'ultima legga anche tutti i dati dall'altro shard principale prima di leggere qualsiasi nuovo dato dallo shard secondario creato dalla fusione. 

 Se sta utilizzando più processori per recuperare dati dal flusso (ad esempio, un processore per shard) e si verifica una frammentazione o una fusione di shard, devi aumentare o diminuire il numero di processori per adattarti alla variazione nel numero di shard. 

 Per ulteriori informazioni sul resharding, tra cui una discussione in merito agli stati degli shard - ad esempio `CLOSED` - consulta [Condividi nuovamente uno stream](kinesis-using-sdk-java-resharding.md). 

# Sviluppa una maggiore fidelizzazione dei consumatori con il AWS SDK per Java
<a name="building-enhanced-consumers-api"></a>

Il *fan-out avanzato* è una funzionalità del flusso di dati Amazon Kinesis che consente ai consumer di ricevere dati da un flusso di dati con velocità di trasmissione effettiva dedicata fino a 2 MiB di dati al secondo per partizione. Un'applicazione consumer che utilizza il fan-out avanzato non è in competizione con altre applicazioni che ricevono dati dal flusso. Per ulteriori informazioni, consulta [Sviluppa consumatori con fan-out migliorati con un throughput dedicato](enhanced-consumers.md).

È possibile utilizzare le operazioni API per creare un'applicazione consumer che utilizza il fan-out avanzato nel flusso di dati Kinesis.

**Registrazione di un'applicazione consumer con il fan-out avanzato mediante l'API del flusso di dati Kinesis**

1. Chiama [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)per registrare la tua candidatura come consumatore che utilizza un fan-out avanzato. Il flusso di dati Kinesis genera un nome della risorsa Amazon (ARN) per il consumer e lo restituisce nella risposta.

1. Per iniziare ad ascoltare uno shard specifico, trasmetti l'ARN del consumatore in una chiamata a. [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) Kinesis Data Streams inizia quindi a inviare all'utente i record da quello shard, sotto forma di eventi [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)di tipo su una connessione HTTP/2. La connessione rimane aperta per un massimo di 5 minuti. Chiama di [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)nuovo se desideri continuare a ricevere i record dallo shard dopo il completamento `future` normale o eccezionale della chiamata. [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)
**Nota**  
L'API `SubscribeToShard` restituisce anche l'elenco delle partizioni secondarie della partizione corrente quando viene raggiunta la fine della partizione corrente. 

1. Per annullare la registrazione di un consumatore che utilizza il fan-out avanzato, chiama. [DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)

Il seguente codice è un esempio di come è possibile sottoscrivere l'applicazione consumer a uno shard, rinnovare la sottoscrizione periodicamente e gestire gli eventi.

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 Se `event.ContinuationSequenceNumber` restituisce `null`, indica che si è verificata una divisione o un'unione della partizione che ha interessato questa partizione. Questa partizione si trova ora nello stato `CLOSED` e hai letto tutti i record di dati disponibili da questa partizione. In questo scenario, è possibile utilizzare `event.childShards` per conoscere le nuove partizioni secondarie della partizione in fase di elaborazione che sono state create dalla divisione o dall'unione. Per ulteriori informazioni, consulta [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

# Interagisci con i dati utilizzando lo Schema Registry AWS Glue
<a name="building-enhanced-consumers-glue-schema-registry"></a>

Puoi integrare i tuoi flussi di dati Kinesis con lo Schema Registry. AWS Glue Lo AWS Glue Schema Registry ti consente di scoprire, controllare ed evolvere centralmente gli schemi, garantendo al contempo che i dati prodotti siano convalidati continuamente da uno schema registrato. Uno schema definisce la struttura e il formato di un registro di dati. Uno schema è una specifica con versioni per la pubblicazione, il consumo o l'archiviazione dei dati in modo affidabile. Lo AWS Glue Schema Registry consente di migliorare la qualità e la governance end-to-end dei dati all'interno delle applicazioni di streaming. Per ulteriori informazioni, consulta [Registro degli schemi di AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uno dei modi per configurare questa integrazione è tramite l'API `GetRecords` Kinesis Data Streams disponibile AWS in Java SDK. 

Per istruzioni dettagliate su come configurare l'integrazione di Kinesis Data Streams con Schema Registry `GetRecords` utilizzando Kinesis Data Streams, consulta la sezione «Interazione con i dati utilizzando APIs Kinesis Data Streams» [in Caso d'uso: integrazione di Amazon](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) Kinesis Data APIs Streams con il registro dello schema Glue. AWS 