

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Sviluppa una maggiore fidelizzazione dei consumatori con il AWS SDK per Java
<a name="building-enhanced-consumers-api"></a>

Il *fan-out avanzato* è una funzionalità del flusso di dati Amazon Kinesis che consente ai consumer di ricevere dati da un flusso di dati con velocità di trasmissione effettiva dedicata fino a 2 MiB di dati al secondo per partizione. Un'applicazione consumer che utilizza il fan-out avanzato non è in competizione con altre applicazioni che ricevono dati dal flusso. Per ulteriori informazioni, consulta [Sviluppa consumatori con fan-out migliorati con un throughput dedicato](enhanced-consumers.md).

È possibile utilizzare le operazioni API per creare un'applicazione consumer che utilizza il fan-out avanzato nel flusso di dati Kinesis.

**Registrazione di un'applicazione consumer con il fan-out avanzato mediante l'API del flusso di dati Kinesis**

1. Chiama [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)per registrare la tua candidatura come consumatore che utilizza un fan-out avanzato. Il flusso di dati Kinesis genera un nome della risorsa Amazon (ARN) per il consumer e lo restituisce nella risposta.

1. Per iniziare ad ascoltare uno shard specifico, trasmetti l'ARN del consumatore in una chiamata a. [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) Kinesis Data Streams inizia quindi a inviare all'utente i record da quello shard, sotto forma di eventi [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)di tipo su una connessione HTTP/2. La connessione rimane aperta per un massimo di 5 minuti. Chiama di [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)nuovo se desideri continuare a ricevere i record dallo shard dopo il completamento `future` normale o eccezionale della chiamata. [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)
**Nota**  
L'API `SubscribeToShard` restituisce anche l'elenco delle partizioni secondarie della partizione corrente quando viene raggiunta la fine della partizione corrente. 

1. Per annullare la registrazione di un consumatore che utilizza il fan-out avanzato, chiama. [DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)

Il seguente codice è un esempio di come è possibile sottoscrivere l'applicazione consumer a uno shard, rinnovare la sottoscrizione periodicamente e gestire gli eventi.

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 Se `event.ContinuationSequenceNumber` restituisce `null`, indica che si è verificata una divisione o un'unione della partizione che ha interessato questa partizione. Questa partizione si trova ora nello stato `CLOSED` e hai letto tutti i record di dati disponibili da questa partizione. In questo scenario, è possibile utilizzare `event.childShards` per conoscere le nuove partizioni secondarie della partizione in fase di elaborazione che sono state create dalla divisione o dall'unione. Per ulteriori informazioni, consulta [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).