本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 APIs 创建流
请使用以下步骤创建 Kinesis 数据流。
构建 Kinesis Data Streams 客户端
必须先构建客户端对象,然后才能处理 Kinesis 数据流。以下 Java 代码实例化一个客户端生成器,并使用它来设置区域、凭据和客户端配置。然后,它会构建一个客户端对象。
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();
有关更多信息,请参阅《AWS 一般参考》中的 Kinesis Data Streams Regions and Endpoints。
创建流
现在您已创建 Kinesis Data Streams 客户端,接下来可使用控制台或以编程方式来创建流。要以编程方式创建流,请实例化 CreateStreamRequest
对象并指定流名称。若要使用预置模式,请为数据流指定分片数量。
-
按需:
CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName );
-
预置:
CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );
流名称用于标识流。该名称的作用域仅限于应用程序使用的 AWS 帐户。它还受区域限制。也就是说,两个不同 AWS 账户中的两个直播可以有相同的名称,同一个 AWS 账户中但位于两个不同区域的两个直播可以有相同的名称,但不能有两个直播在同一个账户和同一个区域中。
流的吞吐量是分片数量的函数。要获得更高的预置吞吐量,您需要更多的分片。分片越多还会增加对直播 AWS 收取的费用。有关计算适合您的应用程序的分片数量的更多信息,请参阅选择数据流容量模式。
配置 createStreamRequest
对象后,应通过对客户端调用 createStream
方法来创建流。在调用 createStream
之后,应等待流达到 ACTIVE
状态,然后再对流执行任何操作。要查看流的状态,请调用 describeStream
方法。但是,如果流不存在,describeStream
将引发异常。因此,请将 describeStream
调用包括在 try/catch
块中。
client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }