透過 API 建立串流 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

透過 API 建立串流

使用下列步驟來建立 Kinesis 資料串流。

建置 Kinesis Data Streams 用戶端

在可以使用 Kinesis 資料串流之前,必須建置用戶端物件。以下 Java 程式碼會將用戶端建置器執行個體化,並使用它來設定區域、登入資料和用戶端組態。接著會建置一個用戶端物件。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();

如需詳細資訊,請參閱 AWS 一般參考 中的 Kinesis Data Streams 區域與端點

建立串流

現在已建立 Kinesis Data Streams 用戶端,您可以建立要使用的串流,而您可以使用 Kinesis Data Streams 主控台或以程式設計方式實現。若要以程式設計方式建立串流,請將 CreateStreamRequest 物件執行個體化,並指定串流的名稱,以及 (如果您想要使用佈建模式) 串流要使用的碎片數量。

  • 隨需

    CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName );
  • 佈建

    CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );

串流名稱可識別串流。名稱的範圍是應用程式所使用的 AWS 帳戶。也受限於區域。也就是說,兩個不同 AWS 帳戶中的兩個流可以具有相同的名稱,並且在同一 AWS 帳戶中但在兩個不同區域中的兩個流可以具有相同的名稱,但不能在同一帳戶和相同區域中有兩個流。

串流的輸送量為碎片數量的函數;要獲得更大的佈建輸送量需要更多的碎片。更多的碎片也會增加為流 AWS 收取費用的成本。如需有關計算應用程式的適當碎片數量的詳細資訊,請參閱選擇資料串流容量模式

設定 createStreamRequest 物件之後,呼叫用戶端上的 createStream 方法來建立串流。呼叫 createStream 之後,等候串流達到 ACTIVE 狀態,之後再對串流執行任何操作。若要查看串流的狀態,請呼叫 describeStream 方法。不過,如果串流不存在,describeStream 會擲出例外狀況。因此,請將 describeStream 呼叫含括在 try/catch 區塊中。

client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }