Schreiben Sie mit der KPL in Ihren Kinesis-Datenstream - Amazon Kinesis Data Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Schreiben Sie mit der KPL in Ihren Kinesis-Datenstream

In den folgenden Abschnitten wird Beispielcode in einem Schritt vom einfachsten Producer-Code bis hin zum vollständig asynchronen Code dargestellt.

Barebones-Produzentencode

Der folgende Code reicht für die Entwicklung eines einfachen Produzenten aus. Die Benutzerdatensätze der Kinesis Producer Library (KPL) werden im Hintergrund verarbeitet.

// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...

Reagieren Sie synchron auf Ergebnisse

Im vorherigen Beispiel wird vom Code nicht geprüft, ob die KPL-Benutzerdatensätze erfolgreich verarbeitet wurden. Die KPL führt alle Wiederholungen durch, die notwendig sind, um Ausfälle zu kompensieren. Wenn Sie jedoch die Ergebnisse prüfen möchten, können Sie sich diese, wie im folgenden Beispiel gezeigt, mit den Future-Objekten ansehen, die von addUserRecord zurückgegeben werden (vorheriges Beispiel aus Kontextgründen erneut aufgeführt).

KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccessful()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }

Reagieren Sie asynchron auf Ergebnisse

Im vorherigen Beispiel wird ein get() Future Objekt aufgerufen, das die Laufzeit blockiert. Wenn Sie die Laufzeit nicht blockieren möchten, können Sie einen asynchronen Callback verwenden, wie im folgenden Beispiel gezeigt:

KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }