Exemplos do Kinesis usando SDK para Java 2.x - AWS SDKExemplos de código

Há mais AWS SDK exemplos disponíveis no GitHub repositório AWS Doc SDK Examples.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exemplos do Kinesis usando SDK para Java 2.x

Os exemplos de código a seguir mostram como realizar ações e implementar cenários comuns usando o AWS SDK for Java 2.x with Kinesis.

Ações são trechos de código de programas maiores e devem ser executadas em contexto. Embora as ações mostrem como chamar funções de serviço individuais, você pode ver as ações no contexto em seus cenários relacionados.

Cada exemplo inclui um link para o código-fonte completo, onde você pode encontrar instruções sobre como configurar e executar o código no contexto.

Ações

O código de exemplo a seguir mostra como usar CreateStream.

SDKpara Java 2.x
nota

Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS.

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class CreateDataStream { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream (for example, StockTradeStream). """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); createStream(kinesisClient, streamName); System.out.println("Done"); kinesisClient.close(); } public static void createStream(KinesisClient kinesisClient, String streamName) { try { CreateStreamRequest streamReq = CreateStreamRequest.builder() .streamName(streamName) .shardCount(1) .build(); kinesisClient.createStream(streamReq); } catch (KinesisException e) { System.err.println(e.getMessage()); System.exit(1); } } }
  • Para API obter detalhes, consulte CreateStreamem AWS SDK for Java 2.x APIReferência.

O código de exemplo a seguir mostra como usar DeleteStream.

SDKpara Java 2.x
nota

Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS.

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class DeleteDataStream { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); deleteStream(kinesisClient, streamName); kinesisClient.close(); System.out.println("Done"); } public static void deleteStream(KinesisClient kinesisClient, String streamName) { try { DeleteStreamRequest delStream = DeleteStreamRequest.builder() .streamName(streamName) .build(); kinesisClient.deleteStream(delStream); } catch (KinesisException e) { System.err.println(e.getMessage()); System.exit(1); } } }
  • Para API obter detalhes, consulte DeleteStreamem AWS SDK for Java 2.x APIReferência.

O código de exemplo a seguir mostra como usar GetRecords.

SDKpara Java 2.x
nota

Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS.

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import java.util.ArrayList; import java.util.List; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class GetRecords { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to read from (for example, StockTradeStream). """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); getStockTrades(kinesisClient, streamName); kinesisClient.close(); } public static void getStockTrades(KinesisClient kinesisClient, String streamName) { String shardIterator; String lastShardId = null; DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); List<Shard> shards = new ArrayList<>(); DescribeStreamResponse streamRes; do { streamRes = kinesisClient.describeStream(describeStreamRequest); shards.addAll(streamRes.streamDescription().shards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).shardId(); } } while (streamRes.streamDescription().hasMoreShards()); GetShardIteratorRequest itReq = GetShardIteratorRequest.builder() .streamName(streamName) .shardIteratorType("TRIM_HORIZON") .shardId(lastShardId) .build(); GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq); shardIterator = shardIteratorResult.shardIterator(); // Continuously read data records from shard. List<Record> records; // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest recordsRequest = GetRecordsRequest.builder() .shardIterator(shardIterator) .limit(1000) .build(); GetRecordsResponse result = kinesisClient.getRecords(recordsRequest); // Put result into record list. Result may be empty. records = result.records(); // Print records for (Record record : records) { SdkBytes byteBuffer = record.data(); System.out.printf("Seq No: %s - %s%n", record.sequenceNumber(), new String(byteBuffer.asByteArray())); } } }
  • Para API obter detalhes, consulte GetRecordsem AWS SDK for Java 2.x APIReferência.

O código de exemplo a seguir mostra como usar PutRecord.

SDKpara Java 2.x
nota

Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS.

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class StockTradesWriter { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); // Ensure that the Kinesis Stream is valid. validateStream(kinesisClient, streamName); setStockData(kinesisClient, streamName); kinesisClient.close(); } public static void setStockData(KinesisClient kinesisClient, String streamName) { try { // Repeatedly send stock trades with a 100 milliseconds wait in between. StockTradeGenerator stockTradeGenerator = new StockTradeGenerator(); // Put in 50 Records for this example. int index = 50; for (int x = 0; x < index; x++) { StockTrade trade = stockTradeGenerator.getRandomTrade(); sendStockTrade(trade, kinesisClient, streamName); Thread.sleep(100); } } catch (KinesisException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Done"); } private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by // the Jackson JSON library. if (bytes == null) { System.out.println("Could not get JSON bytes for stock trade"); return; } System.out.println("Putting trade: " + trade); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in // the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request); } catch (KinesisException e) { System.err.println(e.getMessage()); } } private static void validateStream(KinesisClient kinesisClient, String streamName) { try { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest); if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) { System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again."); System.exit(1); } } catch (KinesisException e) { System.err.println("Error found while describing the stream " + streamName); System.err.println(e); System.exit(1); } } }
  • Para API obter detalhes, consulte PutRecordem AWS SDK for Java 2.x APIReferência.

Exemplos sem servidor

O exemplo de código a seguir mostra como implementar uma função do Lambda que recebe um evento acionado pelo recebimento de mensagens de um stream do Kinesis. A função recupera a carga útil do Kinesis, decodifica do Base64 e registra o conteúdo do registro em log.

SDKpara Java 2.x
nota

Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Consumir um evento do Kinesis com o Lambda usando Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package example; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; public class Handler implements RequestHandler<KinesisEvent, Void> { @Override public Void handleRequest(final KinesisEvent event, final Context context) { LambdaLogger logger = context.getLogger(); if (event.getRecords().isEmpty()) { logger.log("Empty Kinesis Event received"); return null; } for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { try { logger.log("Processed Event with EventId: "+record.getEventID()); String data = new String(record.getKinesis().getData().array()); logger.log("Data:"+ data); // TODO: Do interesting work based on the new data } catch (Exception ex) { logger.log("An error occurred:"+ex.getMessage()); throw ex; } } logger.log("Successfully processed:"+event.getRecords().size()+" records"); return null; } }

O exemplo de código a seguir mostra como implementar uma resposta parcial em lote para funções do Lambda que recebem eventos de um stream do Kinesis. A função relata as falhas do item em lote na resposta, sinalizando para o Lambda tentar novamente essas mensagens posteriormente.

SDKpara Java 2.x
nota

Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

Relatar falhas de itens em lote do Kinesis com o Lambda usando Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> { @Override public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }