通过 API 创建流 - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

通过 API 创建流

请使用以下步骤创建 Kinesis 数据流。

构建 Kinesis Data Streams 客户端

必须先构建客户端对象,然后才能处理 Kinesis 数据流。以下 Java 代码实例化一个客户端生成器,并使用它来设置区域、凭据和客户端配置。然后,它会构建一个客户端对象。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();

有关更多信息,请参阅《AWS 一般参考》中的 Kinesis Data Streams Regions and Endpoints

创建流

现在您已创建 Kinesis Data Streams 客户端,接下来可创建要处理的流。您可通过 Kinesis Data Streams 控制台或以编程方式来创建流。要以编程方式创建流,请实例化 CreateStreamRequest 对象并指定流名称(如要使用预置模式),以及流要使用的分片数量。

  • 按需

    CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName );
  • 预置

    CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );

流名称用于标识流。该名称的作用域仅限于应用程序使用的 AWS 帐户。它还受区域限制。也就是说,两个不同 AWS 账户中的两个直播可以有相同的名称,同一个 AWS 账户中但位于两个不同区域的两个直播可以有相同的名称,但不能有两个直播在同一个账户和同一个区域中。

流的吞吐量是分片数量的函数;配置更大的吞吐量需要更多分片。分片越多还会增加对直播 AWS 收取的费用。有关计算适合您的应用程序的分片数量的更多信息,请参阅选择数据流容量模式

在配置 createStreamRequest 对象之后,通过对客户端调用 createStream 方法来创建流。在调用 createStream 之后,应等待流达到 ACTIVE 状态,然后再对流执行任何操作。要查看流的状态,请调用 describeStream 方法。但是,如果流不存在,describeStream 将引发异常。因此,请将 describeStream 调用包括在 try/catch 块中。

client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }