Criar um fluxo usando as APIs
Use as etapas a seguir para criar o fluxo de dados do Kinesis.
Criar o cliente do Kinesis Data Streams
Para trabalhar com fluxos de dados do Kinesis, é necessário criar um objeto de cliente. O seguinte código Java cria uma instância de um criador de cliente e a usa para definir a região, as credenciais e a configuração do cliente. Em seguida, ele cria um objeto do cliente.
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();
Para obter mais informações, consulte Kinesis Data Streams Regions and Endpoints na Referência geral da AWS.
Criar o fluxo
Depois de criar o cliente do Kinesis Data Streams, é possível criar um fluxo no console ou de forma programática. Para criar um fluxo de forma programática, instancie um objeto CreateStreamRequest
, depois especifique um nome para o fluxo. Se desejar usar o modo provisionado, especifique o número de fragmentos para o uso do fluxo de dados.
-
Sob demanda:
CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName );
-
Provisionado:
CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );
O nome do fluxo identifica o fluxo. O nome é delimitado pela conta da AWS usada pela aplicação. Ele também é delimitado por região. Ou seja, dois fluxos em duas contas diferentes da AWS, ou dois fluxos na mesma conta da AWS em duas regiões diferentes, podem ter o mesmo nome, mas não dois fluxos na mesma conta e na mesma região.
A throughput do fluxo depende do número de fragmentos. Para obter um throughput mais provisionado, mais fragmentos são necessários. Um número maior de fragmentos também aumenta o custo que a AWS cobra pelo fluxo. Para obter mais informações sobre como calcular um número apropriado de fragmentos para o aplicativo, consulte Escolher o modo de capacidade do fluxo de dados.
Depois de configurar o objeto createStreamRequest
, crie um fluxo chamando o método createStream
para o cliente. Após chamar createStream
, aguarde o fluxo alcançar o estado ACTIVE
antes de executar qualquer operação nele. Para verificar o estado do fluxo, chame o método describeStream
. Se o fluxo não existir, describeStream
lançará uma exceção, portanto, coloque a chamada a describeStream
em um bloco try/catch
.
client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }