Scrivi nel tuo flusso di dati Kinesis utilizzando KPL - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Scrivi nel tuo flusso di dati Kinesis utilizzando KPL

Le sezioni seguenti mostrano un esempio di codice in una progressione dal produttore più semplice al codice completamente asincrono.

Codice del produttore Barebones

Il codice seguente è tutto quello che serve per scrivere un producer che lavora al minimo. I record utente della Kinesis Producer Library (KPL) vengono elaborati in background.

// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...

Rispondi ai risultati in modo sincrono

Nell'esempio precedente, il codice non ha controllato se i record utente della KPL sono riusciti. La KPL esegue qualsiasi tentativo necessario per tenere conto degli errori. Tuttavia, se desideri controllare i risultati, puoi esaminarli utilizzando gli oggetti Future restituiti da addUserRecord, come nell'esempio seguente (esempio precedente mostrato per contesto):

KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccessful()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }

Rispondi ai risultati in modo asincrono

L'esempio precedente è la chiamata get() a un Future oggetto, che blocca il runtime. Se non vuoi bloccare il runtime, puoi usare un callback asincrono, come mostrato nell'esempio seguente:

KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }