本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 寫入 Kinesis 資料串流 KPL
以下各節顯示從最基本生產者到完全非同步程式碼的進度範例程式碼。
Barebones 生產者程式碼
以下是撰寫最低限度能夠運作的生產者所需的全部程式碼。Kinesis Producer Library (KPL) 使用者記錄會在背景處理。
// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...
同步回應結果
前述範例的程式碼並未檢查 KPL 使用者記錄是否成功。KPL 會就失敗狀況執行任何必要的重試。然而若您想要檢查結果,則可使用 Future
所傳回的 addUserRecord
物件進行檢查,如以下範例所示 (另顯示前述範例以供對照):
KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccessful()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }
非同步回應結果
上一個範例是在Future
物件get()
上呼叫 ,這會封鎖執行時間。如果您不想封鎖執行時間,可以使用非同步回呼,如下列範例所示:
KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }