本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 建立串流 APIs
使用下列步驟來建立 Kinesis 資料串流。
建置 Kinesis Data Streams 用戶端
在可以使用 Kinesis 資料串流之前,必須建置用戶端物件。以下 Java 程式碼會將用戶端建置器執行個體化,並使用它來設定區域、登入資料和用戶端組態。接著會建置一個用戶端物件。
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();
如需詳細資訊,請參閱 AWS 一般參考 中的 Kinesis Data Streams 區域與端點。
建立串流
現在您已建立 Kinesis Data Streams 用戶端,您可以使用主控台或以程式設計方式建立串流。若要以程式設計方式建立串流,請執行個體化CreateStreamRequest
物件並指定串流的名稱。如果您想要使用佈建模式,請指定串流要使用的碎片數量。
-
隨需:
CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName );
-
佈建:
CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );
串流名稱可識別串流。該名稱範圍為應用程式所使用的 AWS 帳戶。也受限於區域。也就是說,兩個不同 AWS 帳戶中的兩個串流可以具有相同的名稱,而相同 AWS 帳戶中的兩個串流,但兩個不同區域中的串流可以具有相同的名稱,但同一個帳戶和相同區域中的兩個串流不能具有相同的名稱。
串流的輸送量是碎片數量的函數。若要提高佈建輸送量,您需要更多碎片。更多碎片也會增加串流 AWS 的費用。如需有關計算應用程式的適當碎片數量的詳細資訊,請參閱選擇資料串流容量模式。
設定createStreamRequest
物件之後,在用戶端上呼叫 createStream
方法來建立串流。呼叫 createStream
之後,等候串流達到 ACTIVE
狀態,之後再對串流執行任何操作。若要查看串流的狀態,請呼叫 describeStream
方法。不過,如果串流不存在,describeStream
會擲出例外狀況。因此,請將 describeStream
呼叫含括在 try/catch
區塊中。
client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }