使用 API 创建流 - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 API 创建流

请使用以下步骤创建 Kinesis 数据流。

构建 Kinesis Data Streams 客户端

必须先构建客户端对象,然后才能处理 Kinesis 数据流。以下 Java 代码实例化一个客户端生成器,并使用它来设置区域、凭据和客户端配置。然后,它会构建一个客户端对象。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();

有关更多信息,请参阅《AWS 一般参考》中的 Kinesis Data Streams Regions and Endpoints

创建流

现在您已创建 Kinesis Data Streams 客户端,接下来可使用控制台或以编程方式来创建流。要以编程方式创建流,请实例化 CreateStreamRequest 对象并指定流名称。若要使用预置模式,请为数据流指定分片数量。

  • 按需

    CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName );
  • 预置

    CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );

流名称用于标识流。此名称的使用范围限定在应用程序使用的 AWS 账户内。它还受区域限制。也就是说,两个不同 AWS 账户中的两个流可具有相同的名称,不同区域的相同 AWS 账户中的两个流可具有相同的名称,但相同区域的相同账户中的两个流不能具有相同的名称。

流的吞吐量是分片数量的函数。要获得更高的预置吞吐量,您需要更多的分片。分片数量增加也会增加 AWS 针对流收取的费用。有关计算适合您的应用程序的分片数量的更多信息,请参阅选择数据流容量模式

配置 createStreamRequest 对象后,应通过对客户端调用 createStream 方法来创建流。在调用 createStream 之后,应等待流达到 ACTIVE 状态,然后再对流执行任何操作。要查看流的状态,请调用 describeStream 方法。但是,如果流不存在,describeStream 将引发异常。因此,请将 describeStream 调用包括在 try/catch 块中。

client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }