Tutorial: Realización de operaciones básicas de Kinesis Data Streams con la AWS CLI - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Tutorial: Realización de operaciones básicas de Kinesis Data Streams con la AWS CLI

En esta sección se describe el uso básico de un flujo de datos de Kinesis desde la línea de comandos mediante la AWS CLI. Asegúrese de estar familiarizado con los conceptos que se abordan en Terminología y conceptos de Amazon Kinesis Data Streams.

nota

Una vez creado un flujo, su cuenta generará gastos nominales por el uso de Kinesis Data Streams, ya que Kinesis Data Streams no está disponible en el nivel gratuito de AWS. Cuando haya terminado con este tutorial, elimine sus recursos de AWS para dejar de incurrir en cargos. Para obtener más información, consulte Paso 4: Limpiar.

Paso 1: Crear un flujo

El primer paso es crear una secuencia y verificar que se haya creado correctamente. Utilice el siguiente comando para crear una secuencia llamada "Foo":

aws kinesis create-stream --stream-name Foo

A continuación, escriba el siguiente comando para comprobar el progreso de creación de la secuencia:

aws kinesis describe-stream-summary --stream-name Foo

Debería obtener un resultado similar al siguiente ejemplo:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

En este ejemplo, el flujo tiene el estado CREATING, lo que significa que aún no está listo para su uso. Compruébelo de nuevo en unos minutos, y debería ver un resultado parecido al siguiente ejemplo:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Este resultado contiene información que no necesita para este tutorial. La información importante, por el momento, es "StreamStatus": "ACTIVE", que le indica que el flujo está listo para ser utilizado, y la información en la partición única que ha solicitado. También puede verificar la existencia de su nuevo secuencia mediante el comando list-streams, tal y como se muestra aquí:

aws kinesis list-streams

Salida:

{ "StreamNames": [ "Foo" ] }

Paso 2: Insertar un registro

Ahora que ya tiene una secuencia activa, está listo para insertar algunos datos. En este tutorial, utilizará el comando más sencillo posible, put-record, que inserta un único registro de datos que contiene el texto "testdata" en la secuencia:

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

Este comando, si se ejecuta correctamente, dará como resultado algo similar a lo del siguiente ejemplo:

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

¡Enhorabuena, acaba de agregar datos a una secuencia! A continuación verá cómo obtener datos a partir de la secuencia.

Paso 3: Obtener el registro

GetShardIterator

Antes de poder obtener datos del flujo, necesita obtener el iterador de fragmentos para el fragmento que le interese. Un iterador de fragmentos representa la posición de la secuencia y el fragmento a partir de la cual realizará la lectura el consumidor (en este caso, el comando get-record). Utilizará el comando get-shard-iterator, tal y como se indica a continuación:

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

Recuerde que los comandos de aws kinesis utilizan la API de Kinesis Data Streams, por lo que si le interesa alguno de los parámetros que se muestran, puede obtener más información sobre ellos en el tema de referencia de la API GetShardIterator. La ejecución correcta dará como resultado algo similar a lo del siguiente ejemplo:

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

Esta larga cadena de caracteres aparentemente aleatorios es el iterador de fragmentos (el suyo será diferente). Tendrá que copiar y pegar el iterador de particiones en el comando "get" que se muestra a continuación. Los iteradores de fragmentos tienen una vida útil de 300 segundos, un tiempo que debería ser suficiente para que pueda copiar y pegar el iterador de fragmentos en el siguiente comando. Tendrá que eliminar las líneas nuevas de su iterador de particiones antes de pegarlo en el siguiente comando. Si recibe un mensaje de error que informa de que el iterador de particiones ya no es válido, solo tiene que ejecutar de nuevo el comando get-shard-iterator.

GetRecords

El comando get-records obtiene los datos del flujo y llama a GetRecords en la API de Kinesis Data Streams. El iterador de fragmentos especifica la posición del fragmento desde la que quiera empezar a leer los registros de datos de forma secuencial. Si no hay registros disponibles en la parte del fragmento a la que señala el iterador, GetRecords devolverá una lista vacía. Puede necesitar varias llamadas para dar con una parte de la partición que contenga registros.

En el siguiente comando get-records de ejemplo:

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

Si ejecuta este tutorial a partir de un procesador de comandos de tipo Unix, como bash, puede automatizar la adquisición del iterador de particiones utilizando un comando anidado, como este:

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

Si ejecuta este tutorial a partir de un sistema compatible con PowerShell, puede automatizar la adquisición del iterador de particiones utilizando un comando como este:

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

El resultado satisfactorio del comando get-records solicitará registros de su secuencia para la partición especificada al obtener el iterador de particiones, como en el siguiente ejemplo:

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

Tenga en cuenta que get-records se describe anteriormente como una solicitud, lo que significa que puede recibir cero o más registros incluso cuando haya registros en el flujo. Los registros devueltos podrían no representar todos los registros actualmente en el flujo. Esto es normal y el código de producción sondeará el flujo en busca de registros a los intervalos apropiados. Esta velocidad de sondeo variará según los requisitos específicos del diseño de la aplicación.

En esta parte del tutorial, lo primero de lo que se dará cuenta acerca de su registro es que los datos parecen ser inútiles: no se tratará de un texto claro como el testdata que le enviamos. Esto se debe a la forma en la que put-record utiliza la codificación Base64 para permitirle enviar datos binarios. Sin embargo, la compatibilidad con Kinesis Data Streams de la AWS CLI no incluye la descodificación Base64, porque la descodificación Base64 aplicada a contenido binario sin procesar que se envía a stdout puede producir comportamientos no deseados y problemas de seguridad potenciales en determinadas plataformas y terminales. Si utiliza un descodificador Base64 (por ejemplo, https://www.base64decode.org/) para descodificar manualmente dGVzdGRhdGE= verá que, en realidad, es testdata. Esto es suficiente para este tutorial porque, en la práctica, rara vez se usa la AWS CLI para consumir datos. Con más frecuencia, se usa para monitorear el estado del flujo y obtener información, como se mostró anteriormente (describe-stream y list-streams). Para más información acerca de KCL, consulte Desarrollo de consumidores personalizados con rendimiento compartido mediante KCL.

get-records no siempre devolverá todos los registros en la secuencia/partición especificada. Si ocurre esto, use el NextShardIterator del último resultado para obtener el siguiente conjunto de registros. Si se estaban insertando más datos en el flujo, la situación normal en aplicaciones de producción, podría mantener el sondeo de datos cada vez con get-records. Sin embargo, si no llama a get-records con el siguiente iterador de particiones dentro del plazo de vida útil del iterador (300 segundos), obtendrá un mensaje de error y tendrá que utilizar el comando get-shard-iterator para obtener un nuevo iterador de particiones.

Además, en este resultado también se incluye MillisBehindLatest, que es el número de milisegundos a los que se encuentra la respuesta de la operación GetRecords del extremo del flujo, lo que indica el retraso de la aplicación consumidora con respecto al momento actual. Un valor de cero indica que el procesamiento de registros está actualizado y que no hay nuevos registros para procesar en este momento. En el caso de este tutorial, es posible que vea un número bastante grande si se ha ido tomando el tiempo de ir leyendo sobre la marcha. De manera predeterminada, los registros de datos permanecen en un flujo durante 24 horas, y podrá recuperarlos. Este periodo de tiempo se denomina periodo de retención y se puede configurar para durar hasta 365 días.

Un resultado satisfactorio de get-records siempre tendrá un NextShardIterator, aunque no haya más registros actualmente en la secuencia. Este es un modelo de sondeo que asume que un productor puede insertar más registros en la secuencia en cualquier momento determinado. Aunque puede escribir sus propias rutinas de sondeo, si utiliza la KCL mencionada anteriormente para el desarrollo de aplicaciones consumidoras, este sondeo se realizará automáticamente.

Si llama a get-records hasta que no haya más registros en la secuencia y el fragmento a los que está recurriendo, verá resultados con registros vacíos similares a los del siguiente ejemplo:

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

Paso 4: Limpiar

Elimine su flujo para liberar recursos y evitar cargos no deseados en su cuenta. Haga esto cada vez que haya creado un flujo y no lo vaya a usar, ya que los cargos se acumulan por cada flujo, independientemente de si inserta o extrae datos de él o no. El comando de limpieza es el siguiente:

aws kinesis delete-stream --stream-name Foo

El éxito no da ningún resultado Se debe utilizar describe-stream para comprobar el progreso de la eliminación:

aws kinesis describe-stream-summary --stream-name Foo

Si ejecuta este comando inmediatamente después del comando de eliminación, probablemente verá un resultado parecido al siguiente ejemplo:

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

Tras eliminar por completo la secuencia, describe-stream devolverá un error del tipo "no encontrado":

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.