本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用開發具有共用輸送量的自訂消費者 AWS SDK for Java
開發自訂 Kinesis Data Streams 消費者的其中一個方法是使用 Amazon Kinesis Data Streams。APIs本節說明將 Kinesis Data Streams APIs 與 Java 版搭配使 AWS SDK用。本節中的 Java 範例程式碼示範如何執行基本作KDSAPI業,並按照作業類型在邏輯上劃分。
這些範例不代表可立即生產的程式碼,無法檢查出所有可能的例外狀況,也不可視為任何潛在安全或效能疑慮的原因。
您可以APIs使用其他不同的程式設計語言來呼叫 Kinesis Data Streams。如需所有可用的詳細資訊 AWS SDKs,請參閱使用 Amazon Web Services 開始開發
重要
建議開發自訂 Kinesis Data Streams 消費者並在整個期間共用的方法是使用 Kinesis 用戶端程式庫 () KCL。KCL透過處理與分散式運算相關的許多複雜工作,協助您使用和處理 Kinesis 資料串流中的資料。如需詳細資訊,請參閱使用 KCL.
從串流取得資料
Kinesis Data Streams APIs 包括可呼叫以從資料串流擷取記錄的getShardIterator
和getRecords
方法。此為提取模型,將由您的程式碼直接從資料串流中的碎片取出資料記錄。
重要
我們建議您使用提供的記錄處理器支援,從資料串流擷取記錄。KCL此為推送模型,需由您實作程式碼以處理資料。會從資料串流KCL擷取資料記錄,並將其傳送至您的應用程式程式碼。此外,還KCL提供容錯移轉、復原和負載平衡功能。如需詳細資訊,請參閱使用 KCL.
但是,在某些情況下,您可能更喜歡使用 Kinesis Data Streams APIs。例如,您要實作自訂工具以用於對資料串流進行監控或偵錯。
重要
Kinesis Data Streams 支援對資料串流變更資料記錄保留期間。如需詳細資訊,請參閱變更資料保留期。
使用碎片迭代器
您將以碎片為基本單位,從串流擷取記錄。針對每個碎片,以及從該碎片擷取的各個批次的記錄,您必須取得碎片疊代運算。碎片疊代運算是供 getRecordsRequest
物件用於指定從中擷取記錄的碎片位置。與碎片疊代運算相關聯的類型決定了應從碎片中的哪個點擷取記錄 (詳細資訊請參閱本節稍後說明)。在您可以使用碎片迭代器之前,您必須檢索碎片。如需詳細資訊,請參閱列出碎片。
使用 getShardIterator
方法可取得初始碎片疊代運算。如欲就額外各個批次的記錄取得碎片疊代運算,請使用 getNextShardIterator
方法所傳回 getRecordsResult
物件的 getRecords
方法。碎片疊代運算的有效期為 5 分鐘。若您使用了仍在有效期內的碎片疊代運算,則會進行一次新的疊代運算。每一次碎片疊代運算將於 5 分鐘內維持有效,即便其已使用過亦然。
若要取得初始碎片疊代運算,請執行個體化 GetShardIteratorRequest
並將其傳遞給 getShardIterator
方法。若要設定請求,則指定串流和碎片 ID。如需如何取得 AWS 帳戶中串流的詳細資訊,請參閱列出串流。如需如何取得串流中各個碎片的相關資訊,請參閱列出碎片。
String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();
此範本程式碼在取得初始碎片疊代運算時指定 TRIM_HORIZON
做為疊代運算類型。這種疊代運算類型表示應從加入至碎片的第一筆記錄開始傳回各記錄,而不是從最近加入的記錄 (又稱為頂端) 開始。可用的疊代運算類型如下:
-
AT_SEQUENCE_NUMBER
-
AFTER_SEQUENCE_NUMBER
-
AT_TIMESTAMP
-
TRIM_HORIZON
-
LATEST
如需詳細資訊,請參閱ShardIteratorType。
某些疊代運算類型除了表明類型外還需要指定序號,例如:
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
一旦使用 getRecords
取得記錄後,您即可呼叫該記錄的 getSequenceNumber
方法以取得該記錄的序號。
record.getSequenceNumber()
此外,加入記錄至資料串流的程式碼也可透過對 getSequenceNumber
的結果呼叫 putRecord
取得所加入某一筆記錄的序號。
lastSequenceNumber = putRecordResult.getSequenceNumber();
使用序號可以保證各個記錄的順序嚴格遞增。如需詳細資訊,請參閱 PutRecord例子所提供的程式碼範例。
使用 GetRecords
取得碎片疊代運算之後,執行個體化 GetRecordsRequest
物件。使用 setShardIterator
方法指定請求所用的疊代運算。
您也可以使用 setLimit
方法,選擇性設定欲擷取的記錄數目。由 getRecords
傳回的記錄數目一定等於或少於此限制。如果您未指定此限制,getRecords
將傳回已擷取的 10 MB 記錄。以下範本程式碼將此限制設為 25 筆記錄。
如果沒有傳回任何記錄,即表示此碎片在碎片疊代運算所參考的序號位置目前無資料記錄可用。在這種情況下,您的應用程式應該等待一段適當時間處理串流的資料來源。接著再次使用前次呼叫 getRecords
所傳回的碎片疊代運算,嘗試從碎片取得資料。
將 getRecordsRequest
傳遞給 getRecords
方法,並且擷取 getRecordsResult
物件形式的傳回值。若要取得資料記錄,請對 getRecords
物件呼叫 getRecordsResult
方法。
GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();
為了準備對 getRecords
進行另一次呼叫,請透過 getRecordsResult
取得下一碎片疊代運算。
shardIterator = getRecordsResult.getNextShardIterator();
為了獲得最佳結果,逐次呼叫 getRecords
間隔期間應至少休眠 1 秒 (1,000 毫秒) 以免超出 getRecords
頻率的限制。
try { Thread.sleep(1000); } catch (InterruptedException e) {}
一般而言,您應於迴圈中呼叫 getRecords
,即便是在測試情況下擷取單一記錄亦然。僅呼叫一次 getRecords
可能會傳回空的記錄清單,就算是碎片在後續的序號位置包含更多記錄也不免如此。若發生這種情況,便會傳回 NextShardIterator
且空的記錄清單將參考碎片中的某一後續序號,因而連續呼叫 getRecords
最終將傳回記錄。以下範例示範迴圈的用法。
範例:getRecords
以下程式碼範例反映了本節所述的 getRecords
技巧,包括在迴圈中發出呼叫。
// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }
如果您是使用 Kinesis Client Library,可能要進行多次呼叫後才會傳回資料。此行為是經過設計的,並不表示KCL或資料有問題。
適應一個雷碎片
如果 getRecordsResult.getNextShardIterator
傳回 null
,則表示發生了涉及此碎片的碎片分割或合併。此碎片現在處於 CLOSED
狀態,並且您已從此碎片讀取所有可用的資料記錄。
在這個案例中,您可以使用 getRecordsResult.childShards
來了解分割或合併所建立之正在處理之碎片的新子碎片。如需詳細資訊,請參閱ChildShard。
若是發生分割,兩個新碎片的 parentShardId
將與您先前處理的碎片其碎片 ID 相同。該兩個碎片的 adjacentParentShardId
值皆為 null
。
若是發生合併,經由合併所建立的單個新碎片其 parentShardId
將等於其中一個父碎片的碎片 ID,且 adjacentParentShardId
等於另一個父碎片的碎片 ID。您的應用程式已從上述其中一個碎片讀取了所有資料。對於該碎片,getRecordsResult.getNextShardIterator
已傳回 null
。如果資料的順序對您的應用程式很重要,請確保您亦已從另一個父碎片讀取了所有資料,而後再從經由合併所建立的子碎片讀取任何新資料。
若您使用多個處理器從串流擷取資料 (假設每個碎片各一個處理器),則一旦發生碎片分割或合併時,請調升或調降處理器數目以適應碎片數目的變更。
如需重新分片的詳細資訊,包括碎片狀態 (如 CLOSED
) 方面的討論,請參閱重新碎片流。