使用開發具有共用輸送量的自訂消費者 AWS SDK for Java - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用開發具有共用輸送量的自訂消費者 AWS SDK for Java

開發自訂 Kinesis Data Streams 消費者的其中一個方法是使用 Amazon Kinesis Data Streams。APIs本節說明將 Kinesis Data Streams APIs 與 Java 版搭配使 AWS SDK用。本節中的 Java 範例程式碼示範如何執行基本作KDSAPI業,並按照作業類型在邏輯上劃分。

這些範例不代表可立即生產的程式碼,無法檢查出所有可能的例外狀況,也不可視為任何潛在安全或效能疑慮的原因。

您可以APIs使用其他不同的程式設計語言來呼叫 Kinesis Data Streams。如需所有可用的詳細資訊 AWS SDKs,請參閱使用 Amazon Web Services 開始開發

重要

建議開發自訂 Kinesis Data Streams 消費者並在整個期間共用的方法是使用 Kinesis 用戶端程式庫 () KCL。KCL透過處理與分散式運算相關的許多複雜工作,協助您使用和處理 Kinesis 資料串流中的資料。如需詳細資訊,請參閱使用 KCL.

從串流取得資料

Kinesis Data Streams APIs 包括可呼叫以從資料串流擷取記錄的getShardIteratorgetRecords方法。此為提取模型,將由您的程式碼直接從資料串流中的碎片取出資料記錄。

重要

我們建議您使用提供的記錄處理器支援,從資料串流擷取記錄。KCL此為推送模型,需由您實作程式碼以處理資料。會從資料串流KCL擷取資料記錄,並將其傳送至您的應用程式程式碼。此外,還KCL提供容錯移轉、復原和負載平衡功能。如需詳細資訊,請參閱使用 KCL.

但是,在某些情況下,您可能更喜歡使用 Kinesis Data Streams APIs。例如,您要實作自訂工具以用於對資料串流進行監控或偵錯。

重要

Kinesis Data Streams 支援對資料串流變更資料記錄保留期間。如需詳細資訊,請參閱變更資料保留期

使用碎片迭代器

您將以碎片為基本單位,從串流擷取記錄。針對每個碎片,以及從該碎片擷取的各個批次的記錄,您必須取得碎片疊代運算。碎片疊代運算是供 getRecordsRequest 物件用於指定從中擷取記錄的碎片位置。與碎片疊代運算相關聯的類型決定了應從碎片中的哪個點擷取記錄 (詳細資訊請參閱本節稍後說明)。在您可以使用碎片迭代器之前,您必須檢索碎片。如需詳細資訊,請參閱列出碎片

使用 getShardIterator 方法可取得初始碎片疊代運算。如欲就額外各個批次的記錄取得碎片疊代運算,請使用 getNextShardIterator 方法所傳回 getRecordsResult 物件的 getRecords 方法。碎片疊代運算的有效期為 5 分鐘。若您使用了仍在有效期內的碎片疊代運算,則會進行一次新的疊代運算。每一次碎片疊代運算將於 5 分鐘內維持有效,即便其已使用過亦然。

若要取得初始碎片疊代運算,請執行個體化 GetShardIteratorRequest 並將其傳遞給 getShardIterator 方法。若要設定請求,則指定串流和碎片 ID。如需如何取得 AWS 帳戶中串流的詳細資訊,請參閱列出串流。如需如何取得串流中各個碎片的相關資訊,請參閱列出碎片

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

此範本程式碼在取得初始碎片疊代運算時指定 TRIM_HORIZON 做為疊代運算類型。這種疊代運算類型表示應從加入至碎片的第一筆記錄開始傳回各記錄,而不是從最近加入的記錄 (又稱為頂端) 開始。可用的疊代運算類型如下:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

如需詳細資訊,請參閱ShardIteratorType

某些疊代運算類型除了表明類型外還需要指定序號,例如:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

一旦使用 getRecords 取得記錄後,您即可呼叫該記錄的 getSequenceNumber 方法以取得該記錄的序號。

record.getSequenceNumber()

此外,加入記錄至資料串流的程式碼也可透過對 getSequenceNumber 的結果呼叫 putRecord 取得所加入某一筆記錄的序號。

lastSequenceNumber = putRecordResult.getSequenceNumber();

使用序號可以保證各個記錄的順序嚴格遞增。如需詳細資訊,請參閱 PutRecord例子所提供的程式碼範例。

使用 GetRecords

取得碎片疊代運算之後,執行個體化 GetRecordsRequest 物件。使用 setShardIterator 方法指定請求所用的疊代運算。

您也可以使用 setLimit 方法,選擇性設定欲擷取的記錄數目。由 getRecords 傳回的記錄數目一定等於或少於此限制。如果您未指定此限制,getRecords 將傳回已擷取的 10 MB 記錄。以下範本程式碼將此限制設為 25 筆記錄。

如果沒有傳回任何記錄,即表示此碎片在碎片疊代運算所參考的序號位置目前無資料記錄可用。在這種情況下,您的應用程式應該等待一段適當時間處理串流的資料來源。接著再次使用前次呼叫 getRecords 所傳回的碎片疊代運算,嘗試從碎片取得資料。

getRecordsRequest 傳遞給 getRecords 方法,並且擷取 getRecordsResult 物件形式的傳回值。若要取得資料記錄,請對 getRecords 物件呼叫 getRecordsResult 方法。

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

為了準備對 getRecords 進行另一次呼叫,請透過 getRecordsResult 取得下一碎片疊代運算。

shardIterator = getRecordsResult.getNextShardIterator();

為了獲得最佳結果,逐次呼叫 getRecords 間隔期間應至少休眠 1 秒 (1,000 毫秒) 以免超出 getRecords 頻率的限制。

try { Thread.sleep(1000); } catch (InterruptedException e) {}

一般而言,您應於迴圈中呼叫 getRecords,即便是在測試情況下擷取單一記錄亦然。僅呼叫一次 getRecords 可能會傳回空的記錄清單,就算是碎片在後續的序號位置包含更多記錄也不免如此。若發生這種情況,便會傳回 NextShardIterator 且空的記錄清單將參考碎片中的某一後續序號,因而連續呼叫 getRecords 最終將傳回記錄。以下範例示範迴圈的用法。

範例:getRecords

以下程式碼範例反映了本節所述的 getRecords 技巧,包括在迴圈中發出呼叫。

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

如果您是使用 Kinesis Client Library,可能要進行多次呼叫後才會傳回資料。此行為是經過設計的,並不表示KCL或資料有問題。

適應一個雷碎片

如果 getRecordsResult.getNextShardIterator 傳回 null,則表示發生了涉及此碎片的碎片分割或合併。此碎片現在處於 CLOSED 狀態,並且您已從此碎片讀取所有可用的資料記錄。

在這個案例中,您可以使用 getRecordsResult.childShards 來了解分割或合併所建立之正在處理之碎片的新子碎片。如需詳細資訊,請參閱ChildShard

若是發生分割,兩個新碎片的 parentShardId 將與您先前處理的碎片其碎片 ID 相同。該兩個碎片的 adjacentParentShardId 值皆為 null

若是發生合併,經由合併所建立的單個新碎片其 parentShardId 將等於其中一個父碎片的碎片 ID,且 adjacentParentShardId 等於另一個父碎片的碎片 ID。您的應用程式已從上述其中一個碎片讀取了所有資料。對於該碎片,getRecordsResult.getNextShardIterator 已傳回 null。如果資料的順序對您的應用程式很重要,請確保您亦已從另一個父碎片讀取了所有資料,而後再從經由合併所建立的子碎片讀取任何新資料。

若您使用多個處理器從串流擷取資料 (假設每個碎片各一個處理器),則一旦發生碎片分割或合併時,請調升或調降處理器數目以適應碎片數目的變更。

如需重新分片的詳細資訊,包括碎片狀態 (如 CLOSED) 方面的討論,請參閱重新碎片流