Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Tutorial: Esegui le operazioni di base di Kinesis Data Streams utilizzando il AWS CLI
Questa sezione descrive l'utilizzo di base di un flusso di dati Kinesis dalla riga di comando utilizzando la AWS CLI. Assicurati di avere familiarità con i concetti discussi in Amazon Kinesis Data Streams Terminologia e concetti.
Nota
Dopo aver creato uno stream, sul tuo account vengono addebitati costi nominali per l'utilizzo di Kinesis Data Streams perché Kinesis Data Streams non è idoneo per il piano gratuito. AWS Al termine di questo tutorial, elimina le tue AWS risorse per evitare di incorrere in addebiti. Per ulteriori informazioni, consulta Fase 4: pulizia.
Argomenti
Passaggio 1: crea uno stream
Per prima cosa devi creare un flusso e verificare che sia stato creato correttamente. Utilizza il seguente comando per creare un flusso denominato "Foo":
aws kinesis create-stream --stream-name Foo
Quindi, invia il comando seguente per verificare l'avanzamento della creazione del flusso:
aws kinesis describe-stream-summary --stream-name Foo
Dovresti ottenere un output simile a quello dell'esempio seguente:
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
In questo esempio, lo stream ha uno statoCREATING, il che significa che non è ancora pronto per l'uso. Ricontrolla dopo alcuni minuti e dovresti visualizzare un output simile a quello dell'esempio seguente:
{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }
In questo output ci sono informazioni che non ti servono per questo tutorial. Le informazioni importanti per ora sono "StreamStatus": "ACTIVE"
le informazioni che indicano che lo stream è pronto per essere utilizzato e le informazioni sul singolo shard richiesto. Puoi inoltre verificare l'esistenza del tuo nuovo flusso utilizzando il comando list-streams
, come mostrato qui:
aws kinesis list-streams
Output:
{
"StreamNames": [
"Foo"
]
}
Passaggio 2: inserisci un record
Ora che disponi di un flusso attivo, puoi iniziare ad aggiungere dati. Per questo tutorial, utilizzerai il comando più semplice, put-record
, che aggiunge un singolo record di dati contenente il testo "testdata" nel flusso:
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
Questo comando, se utilizzato con esito positivo, genera un output simile al seguente:
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
}
Congratulazioni, hai appena aggiunto dati a un flusso! Nella fase successiva scoprirai come estrarre dati dal flusso.
Passaggio 3: ottieni il record
GetShardIterator
Prima di poter ottenere dati dallo stream, devi procurarti lo shard iterator per lo shard che ti interessa. Un iteratore di shard definisce la posizione del flusso e lo shard da cui il consumer (il comando get-record
in questo caso) effettuerà la lettura. Utilizzerai il get-shard-iterator
comando come segue:
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
Ricorda che i aws kinesis
comandi sono accompagnati da un Kinesis API Data Streams, quindi se sei curioso di conoscere uno dei parametri mostrati, puoi leggerli GetShardIterator
APInell'argomento di riferimento. L'esecuzione corretta produrrà un output simile al seguente esempio:
{
"ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg="
}
La lunga stringa di caratteri apparentemente casuali è l'iteratore di shard (il tuo sarà diverso). È necessario copiare/incollare l'iteratore shard nel comando get, mostrato di seguito. Gli iteratori di shard hanno un ciclo di vita di 300 secondi, che dovrebbe essere sufficiente per consentirti di copiare e incollare l'iteratore di shard nel comando successivo. È necessario rimuovere tutte le nuove righe dall'iteratore shard prima di incollarle al comando successivo. Se ricevi un messaggio di errore che indica che lo shard iterator non è più valido, esegui nuovamente il comando. get-shard-iterator
GetRecords
Il get-records
comando ottiene i dati dallo stream e si risolve in una chiamata GetRecords
a Kinesis Data Streams. API L'iteratore di shard specifica la posizione nello shard da cui desideri iniziare a leggere i record di dati in sequenza. Se non sono disponibili record nella porzione dello shard a cui l'iteratore punta, GetRecords
restituisce un elenco vuoto. Potrebbero essere necessarie più chiamate per accedere a una parte dello shard che contiene i record.
Nel seguente esempio del get-records
comando:
aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=
Se state eseguendo questo tutorial da un processore di comandi di tipo Unix come bash, potete automatizzare l'acquisizione dell'iteratore shard usando un comando annidato, come questo:
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR
Se state eseguendo questo tutorial da un sistema che lo supporta PowerShell, potete automatizzare l'acquisizione dello shard iterator usando un comando come questo:
aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])
Il risultato positivo del get-records
comando richiederà i record dal vostro stream per lo shard che avete specificato quando avete ottenuto lo shard iterator, come nell'esempio seguente:
{
"Records":[ {
"Data":"dGVzdGRhdGE=",
"PartitionKey":"123”,
"ApproximateArrivalTimestamp": 1.441215410867E9,
"SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
} ],
"MillisBehindLatest":24000,
"NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
Nota che get-records
è descritta sopra come una richiesta, il che significa che potresti ricevere zero o più record anche se ci sono record nel tuo stream. I record restituiti potrebbero non rappresentare tutti i record attualmente presenti nel tuo stream. Questo è normale e il codice di produzione cercherà i record nello stream a intervalli appropriati. Questa velocità di polling varierà in base ai requisiti di progettazione specifici dell'applicazione.
Nel tuo record in questa parte del tutorial, noterai che i dati sembrano spazzatura e non sono il testo testdata
chiaro che abbiamo inviato. Ciò è dovuto al modo in cui put-record
utilizza la codifica Base64 per consentirti di inviare dati binari. Tuttavia, il supporto AWS CLI di Kinesis Data Streams non fornisce la decodifica Base64 perché la decodifica Base64 su contenuto binario non elaborato stampato su stdout può portare a comportamenti indesiderati e potenziali problemi di sicurezza su determinate piattaforme e terminali. Se utilizzi un decoder Base64 (ad esempio, https://www.base64decode.org/dGVzdGRhdGE=
, osserverai che in realtà si tratta di testdata
. Questo è sufficiente ai fini di questo tutorial perché, in pratica, viene usato raramente per consumare dati. AWS CLI Più spesso, viene utilizzato per monitorare lo stato dello stream e ottenere informazioni, come mostrato in precedenza (describe-stream
elist-streams
). Per ulteriori informazioni suKCL, vedere Sviluppo di consumatori personalizzati con utilizzo KCL di un throughput condiviso.
get-records
non sempre restituisce tutti i record nello stream/shard specificato. Se ciò si verifica, utilizza NextShardIterator
dall'ultimo risultato per ottenere il set di record successivo. Se nel flusso venissero inseriti più dati, che è la situazione normale nelle applicazioni di produzione, è possibile continuare a eseguire il polling dei dati utilizzati ogni volta. get-records
Tuttavia, se non chiamate get-records
utilizzando lo shard iterator successivo entro la durata di 300 secondi dello shard iterator, verrà visualizzato un messaggio di errore e sarà necessario utilizzare il get-shard-iterator
comando per ottenere un nuovo iteratore di shard.
In questo output viene MillisBehindLatest
fornito anche il numero di millisecondi in cui la risposta dell'GetRecordsoperazione proviene dall'estremità del flusso, che indica il ritardo dell'utente rispetto all'ora corrente. Un valore di zero indica che l'elaborazione dei record è aggiornata e che non sono presenti nuovi record da elaborare in questo momento. Nel nostro caso, potresti riscontrare un numero piuttosto elevato se hai dedicato diverso tempo ad acquisire familiarità con le fasi di questo tutorial. Per impostazione predefinita, i record di dati rimangono in un flusso per 24 ore in attesa che tu li recuperi. Questo intervallo di tempo viene chiamato periodo di conservazione ed è configurabile fino a 365 giorni.
Un get-records
risultato positivo avrà sempre un valore NextShardIterator
anche se non ci sono altri record attualmente nello stream. Si tratta di un modello di polling che presume che un producer stia potenzialmente immettendo più record nel flusso in un dato momento. Sebbene sia possibile creare le proprie routine di sondaggio, se si utilizzano quelle sopra menzionate KCL per lo sviluppo di applicazioni destinate ai consumatori, questi sondaggi verranno eseguiti automaticamente.
Se chiami get-records
finché non ci sono più record nello stream e nello shard da cui stai estraendo, vedrai un output con record vuoti simile al seguente esempio:
{
"Records": [],
"NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk="
}
Fase 4: pulizia
Elimina lo stream per liberare risorse ed evitare addebiti involontari sul tuo account. Esegui questa operazione ogni volta che hai creato uno stream e non lo utilizzerai, perché i costi maturano per ogni stream, indipendentemente dal fatto che tu stia inserendo e ricevendo dati con esso o meno. Il comando di pulizia è il seguente:
aws kinesis delete-stream --stream-name Foo
Il successo non produce alcun risultato. Usa describe-stream
per controllare lo stato di avanzamento dell'eliminazione:
aws kinesis describe-stream-summary --stream-name Foo
Se esegui questo comando immediatamente dopo il comando delete, vedrai un output simile al seguente esempio:
{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",
Quando il flusso è stato completamente eliminato, describe-stream
genererà un errore di tipo "non trovato":
A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation:
Stream Foo under account 123456789012 not found.