As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Tutorial: executar operações básicas do Kinesis Data Streams usando a AWS CLI
Esta seção descreve o uso básico de um fluxo de dados do Kinesis na linha de comando usando a AWS CLI. Certifique-se de estar familiarizado com os conceitos discutidos em Terminologia e conceitos do Amazon Kinesis Data Streams.
nota
Depois de criar um fluxo, sua conta incorre em cobranças nominais pelo uso do Kinesis Data Streams, pois esse serviço não está qualificado para o nível gratuito da AWS. Ao concluir este tutorial, exclua seus recursos da AWS para parar de gerar cobranças. Para obter mais informações, consulte Etapa 4: limpar.
Etapa 1: criar um fluxo
A primeira etapa é criar um fluxo e verificar se ele foi criado com êxito. Use o comando a seguir para criar um fluxo denominado "Foo":
aws kinesis create-stream --stream-name Foo
Em seguida, emita o comando a seguir para verificar o andamento da criação do fluxo:
aws kinesis describe-stream-summary --stream-name Foo
É necessário obter uma saída semelhante ao exemplo a seguir:
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
Neste exemplo, o fluxo tem o status CREATING, o que significa que ainda não está pronto para uso. Verifique novamente em alguns instantes para ver uma saída semelhante ao exemplo a seguir:
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
Há informações nessa saída que não são necessárias neste tutorial. A informação importante agora é "StreamStatus": "ACTIVE"
, que mostra que o fluxo está pronto para uso, e as informações sobre o único fragmento solicitado. Também é possível verificar a existência do novo fluxo usando o comando list-streams
, como mostrado aqui:
aws kinesis list-streams
Saída:
{
"StreamNames": [
"Foo"
]
}
Etapa 2: colocar um registro
Agora que há um fluxo ativo, é possível colocar alguns dados. Neste tutorial, será usado o comando mais simples possível, put-record
, que coloca um único registro de dados contendo o texto "testdata" no fluxo:
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
Se bem-sucedido, esse comando gerará uma saída semelhante ao seguinte exemplo:
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
}
Parabéns, você adicionou dados a um fluxo! Em seguida, veja como obter dados do fluxo.
Etapa 3: obter o registro
GetShardIterator
Antes de obter dados do fluxo, é necessário obter o iterador referente ao fragmento do seu interesse. Um iterador de fragmentos representa a posição do fluxo e do fragmento da qual o consumidor (neste caso, o comando get-record
) lerá. Use o comando get-shard-iterator
da seguinte forma:
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
Lembre-se de que os comandos aws kinesis
têm o suporte de uma API do Kinesis Data Streams, portanto, caso tenha curiosidade sobre algum parâmetro mostrado, leia o tópico de referência da API GetShardIterator
. A execução bem-sucedida resultará em uma saída semelhante ao seguinte exemplo:
{
"ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg="
}
A string longa de caracteres aparentemente aleatórios é o iterador de fragmentos (a sua será diferente). É necessário copiar/colar o iterador de fragmentos no comando get, mostrado a seguir. Os iteradores de fragmentos têm um ciclo de vida de 300 segundos, tempo que deve ser suficiente para copiar/colar o iterador de fragmentos no próximo comando. É necessário remover quaisquer novas linhas do seu iterador de fragmentos antes de colá-lo no próximo comando. Ao receber uma mensagem de erro de que o iterador de fragmentos não é mais válido, execute o comando get-shard-iterator
novamente.
GetRecords
O comando get-records
recebe dados do fluxo e resulta em uma chamada a GetRecords
na API do Kinesis Data Streams. O iterador de fragmentos especifica a posição, no fragmento, de onde iniciará a leitura dos registros de dados em sequência. Se não houver registros disponíveis na parte do fragmento para onde o iterador aponta, GetRecords
retornará uma lista vazia. Poderá demorar várias chamadas para se chegar a uma parte do fragmento que contenha registros.
No exemplo do comando get-records
a seguir:
aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=
Se estiver executando este tutorial a partir de um processador de comando do tipo Unix, como bash, poderá automatizar a aquisição de iterador de fragmentos usando um comando aninhado, como este:
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR
Se estiver executando este tutorial a partir de um sistema compatível com PowerShell, poderá automatizar a aquisição do iterador de fragmentos usando um comando como este:
aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])
O resultado bem-sucedido do comando get-records
solicitará registros do seu fluxo para o fragmento especificado quando o iterador de fragmentos foi obtido, como no exemplo a seguir:
{
"Records":[ {
"Data":"dGVzdGRhdGE=",
"PartitionKey":"123”,
"ApproximateArrivalTimestamp": 1.441215410867E9,
"SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
} ],
"MillisBehindLatest":24000,
"NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
Observe que get-records
é descrito acima como uma solicitação, ou seja, pode-se receber zero ou mais registros mesmo que haja registros em seu fluxo. Os registros retornados podem não representar todos os registros atualmente em seu fluxo. Isso é normal, e o código de produção pesquisará o fluxo em busca de registros em intervalos apropriados. Essa velocidade de pesquisa variará dependendo dos requisitos específicos de design do aplicativo.
Em seu registro nesta parte do tutorial, perceba que os dados parecem ser lixo e não o texto limpo testdata
que foi enviado. Isso ocorre devido ao modo como put-record
usa a codificação Base64 para permitir o envio de dados binários. No entanto, o suporte do Kinesis Data Streams na AWS CLI não fornece decodificação Base64, pois essa decodificação para conteúdo binário bruto impresso em stdout pode causar comportamento indesejado e possíveis problemas de segurança em algumas plataformas e terminais. Ao usar um decodificador Base64 (por exemplo, https://www.base64decode.org/dGVzdGRhdGE=
manualmente, verá que ele é, na verdade, testdata
. Isso é suficiente para fins deste tutorial porque, na prática, a AWS CLI raramente é usada para consumir dados. Mais frequentemente, ela é usada para monitorar o estado do fluxo e obter informações, conforme mostrado anteriormente (describe-stream
e list-streams
). Para obter mais informações sobre a KCL, consulte Developing Custom Consumers with Shared Throughput Using KCL.
Nem sempre get-records
retornará todos os registros no fluxo/fragmento especificado. Quando isso acontecer, use o NextShardIterator
a partir do último resultado para obter o próximo conjunto de registros. Se mais dados estavam sendo colocados no fluxo, o que é a situação normal em aplicativos de produção, pode-se continuar sondando dados usando get-records
todas as vezes. No entanto, se get-records
não for chamado usando o próximo iterador de fragmentos dentro do tempo de vida de 300 segundos do iterador de fragmentos, uma mensagem de erro será gerada, e o comando get-shard-iterator
deverá ser usado para obter um novo iterador de fragmentos.
Essa saída também fornece MillisBehindLatest
, que é o número de milissegundos entre a resposta da operação GetRecords e a extremidade do fluxo, indicando o atraso do consumidor em relação ao horário atual. Um valor zero indica que o processamento de registros foi alcançado e não há nenhum registro novo para processar no momento. No caso deste tutorial, se o meterial estiver sendo lido conforme durante o progresso, um número muito grande pode ser visto. Por padrão, os registros de dados permanecerão em um fluxo por 24 horas aguardando serem recuperados. Esse período, chamado de período de retenção, pode ser configurado para até 365 dias.
Um resultado bem-sucedido de get-records
sempre terá um NextShardIterator
, mesmo que não haja mais nenhum registro atualmente no fluxo. Este é um modelo de sondagem que assume que um produtor colocará mais registros no fluxo em um determinado momento. É possível criar rotinas de sondagem próprias. Porém, ao usar a KCL mencionada anteriormente para desenvolver aplicativos de consumidor, ela se encarregará dessa sondagem.
Ao chamar get-records
até que não haja mais registros no fluxo e o fragmento do qual se esteja sondando, se obterá uma saída com registros vazios, semelhante ao exemplo a seguir:
{
"Records": [],
"NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk="
}
Etapa 4: limpar
Exclua seu fluxo para liberar recursos e evitar cobranças indesejadas à sua conta. Faça isso sempre que tiver criado um fluxo que não será usado, pois as cobranças incidem por fluxo mesmo que ele não seja usado para colocar e obter dados. O comando de limpeza é o seguinte:
aws kinesis delete-stream --stream-name Foo
O êxito do comando não gera saída. Use describe-stream
para verificar o andamento da exclusão:
aws kinesis describe-stream-summary --stream-name Foo
Ao executar esse comando imediatamente após o comando de exclusão, haverá uma saída semelhante ao exemplo a seguir:
{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",
Após a exclusão total do fluxo, describe-stream
gerará um erro "não encontrado":
A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation:
Stream Foo under account 123456789012 not found.