Suscribirse a Amazon Kinesis Data Streams - AWS SDK for Java 2.x

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Suscribirse a Amazon Kinesis Data Streams

Los siguientes ejemplos muestran cómo recuperar y procesar datos de flujos de datos de Amazon Kinesis con el método subscribeToShard. Kinesis Data Streams ahora emplea la característica de distribución ramificada mejorada y una API de recuperación de datos de HTTP/2 de baja latencia, que permite a los desarrolladores ejecutar fácilmente varias aplicaciones de baja latencia y de alto desempeño en la misma secuencia de datos de Kinesis.

Configuración

En primer lugar, cree un cliente de Kinesis asíncrono y un objeto SubscribeToShardRequest. Estos objetos se utilizan en cada uno de los siguientes ejemplos para suscribirse a eventos de Kinesis.

Importaciones

import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

Código

Region region = Region.US_EAST_1; KinesisAsyncClient client = KinesisAsyncClient.builder() .region(region) .build(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId("arn:aws:kinesis:us-east-1:111122223333:stream/StockTradeStream") .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();

Usar la interfaz del creador

Puede utilizar el método builder para simplificar la creación de SubscribeToShardResponseHandler.

Con el creador, puede configurar cada devolución de llamada de ciclo de vida con una llamada de método en lugar de implementar la interfaz completa.

Código

private static CompletableFuture<Void> responseHandlerBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onComplete(() -> System.out.println("All records stream successfully")) // Must supply some type of subscriber .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }

Para tener más control sobre publicador, puede usar el método publisherTransformer para personalizar el publicador.

Código

private static CompletableFuture<Void> responseHandlerBuilderPublisherTransformer(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> p.filter(e -> e instanceof SubscribeToShardEvent).limit(100)) .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }

Consulte el ejemplo completo en GitHub.

Usa un controlador de respuesta personalizado

Para tener un control completo del suscriptor y el publicador, implemente la interfaz de SubscribeToShardResponseHandler.

En este ejemplo, se implementa el método onEventStream, lo que le permite el acceso completo al publicador. Esto demuestra cómo transformar el publicador en registros de eventos para su impresión por parte del suscriptor.

Código

private static CompletableFuture<Void> responseHandlerBuilderClassic(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = new SubscribeToShardResponseHandler() { @Override public void responseReceived(SubscribeToShardResponse response) { System.out.println("Receieved initial response"); } @Override public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) { publisher // Filter to only SubscribeToShardEvents .filter(SubscribeToShardEvent.class) // Flat map into a publisher of just records .flatMapIterable(SubscribeToShardEvent::records) // Limit to 1000 total records .limit(1000) // Batch records into lists of 25 .buffer(25) // Print out each record batch .subscribe(batch -> System.out.println("Record Batch - " + batch)); } @Override public void complete() { System.out.println("All records stream successfully"); } @Override public void exceptionOccurred(Throwable throwable) { System.err.println("Error during stream - " + throwable.getMessage()); } }; return client.subscribeToShard(request, responseHandler); }

Consulte el ejemplo completo en GitHub.

Usa la interfaz de visitante

Puede utilizar un objeto Visitor para suscribirse a eventos específicos que le interese ver.

Código

private static CompletableFuture<Void> responseHandlerBuilderVisitorBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = SubscribeToShardResponseHandler.Visitor .builder() .onSubscribeToShardEvent(e -> System.out.println("Received subscribe to shard event " + e)) .build(); SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); }

Consulte el ejemplo completo en GitHub.

Usa un suscriptor personalizado

También puede implementar su propio suscriptor personalizado para suscribirse a la secuencia.

Este fragmento de código muestra un ejemplo de suscriptor.

Código

private static class MySubscriber implements Subscriber<SubscribeToShardEventStream> { private Subscription subscription; private AtomicInteger eventCount = new AtomicInteger(0); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(SubscribeToShardEventStream shardSubscriptionEventStream) { System.out.println("Received event " + shardSubscriptionEventStream); if (eventCount.incrementAndGet() >= 100) { // You can cancel the subscription at any time if you wish to stop receiving events. subscription.cancel(); } subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.println("Error occurred while stream - " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Finished streaming all events"); } }

Puede pasar el suscriptor personalizado al método subscribe, como se muestra en el siguiente fragmento de código.

Código

private static CompletableFuture<Void> responseHandlerBuilderSubscriber(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(MySubscriber::new) .build(); return client.subscribeToShard(request, responseHandler); }

Consulte el ejemplo completo en GitHub.

Escribir registros de datos en una secuencia de datos de Kinesis

Puede utilizar el objeto KinesisClient para escribir registros de datos en una secuencia de datos de Kinesis mediante el método putRecords. Para invocar correctamente este método, cree un objeto PutRecordsRequest. Debe pasar el nombre de la secuencia de datos al método streamName. También debe pasar los datos mediante el método putRecords (como se muestra en el siguiente ejemplo de código).

Importaciones

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;

En el siguiente ejemplo de código Java, observe que el objeto StockTrade se utiliza como los datos para escribir en la secuencia de datos de Kinesis. Antes de ejecutar este ejemplo, asegúrese de haber creado la secuencia de datos.

Código

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class StockTradesWriter { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); // Ensure that the Kinesis Stream is valid. validateStream(kinesisClient, streamName); setStockData(kinesisClient, streamName); kinesisClient.close(); } public static void setStockData(KinesisClient kinesisClient, String streamName) { try { // Repeatedly send stock trades with a 100 milliseconds wait in between. StockTradeGenerator stockTradeGenerator = new StockTradeGenerator(); // Put in 50 Records for this example. int index = 50; for (int x = 0; x < index; x++) { StockTrade trade = stockTradeGenerator.getRandomTrade(); sendStockTrade(trade, kinesisClient, streamName); Thread.sleep(100); } } catch (KinesisException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Done"); } private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by // the Jackson JSON library. if (bytes == null) { System.out.println("Could not get JSON bytes for stock trade"); return; } System.out.println("Putting trade: " + trade); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in // the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request); } catch (KinesisException e) { System.err.println(e.getMessage()); } } private static void validateStream(KinesisClient kinesisClient, String streamName) { try { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest); if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) { System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again."); System.exit(1); } } catch (KinesisException e) { System.err.println("Error found while describing the stream " + streamName); System.err.println(e); System.exit(1); } } }

Consulte el ejemplo completo en GitHub.

Usar una biblioteca de terceros

Puede utilizar otras bibliotecas de terceros en lugar de implementar un suscriptor personalizado. Este ejemplo muestra el uso de la implementación RxJava, pero puede utilizar cualquier biblioteca que implemente las interfaces de secuencias reactivas. Consulte la página wiki de RxJava en Github para obtener más información acerca de dicha biblioteca.

Para utilizar la biblioteca, añádala como una dependencia. Si está utilizando Maven, el ejemplo muestra el fragmento de POM que se debe utilizar.

Entrada de POM

<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency>

Importaciones

import java.net.URI; import java.util.concurrent.CompletableFuture; import io.reactivex.Flowable; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.StartingPosition; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.awssdk.utils.AttributeMap;

En este ejemplo se utiliza RxJava en el método de ciclo de vida onEventStream. Esto le ofrece acceso completo al publicador, que se puede utilizar para crear un Rx Flowable.

Código

SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onEventStream(p -> Flowable.fromPublisher(p) .ofType(SubscribeToShardEvent.class) .flatMapIterable(SubscribeToShardEvent::records) .limit(1000) .buffer(25) .subscribe(e -> System.out.println("Record batch = " + e))) .build();

También puede usar el método publisherTransformer con el publicador Flowable. Debe adaptar el publicador Flowable a un SdkPublisher, tal y como se muestra en el siguiente ejemplo.

Código

SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> SdkPublisher.adapt(Flowable.fromPublisher(p).limit(100))) .build();

Consulte el ejemplo completo en GitHub.

Más información