Seleziona le tue preferenze relative ai cookie

Utilizziamo cookie essenziali e strumenti simili necessari per fornire il nostro sito e i nostri servizi. Utilizziamo i cookie prestazionali per raccogliere statistiche anonime in modo da poter capire come i clienti utilizzano il nostro sito e apportare miglioramenti. I cookie essenziali non possono essere disattivati, ma puoi fare clic su \"Personalizza\" o \"Rifiuta\" per rifiutare i cookie prestazionali.

Se sei d'accordo, AWS e le terze parti approvate utilizzeranno i cookie anche per fornire utili funzionalità del sito, ricordare le tue preferenze e visualizzare contenuti pertinenti, inclusa la pubblicità pertinente. Per continuare senza accettare questi cookie, fai clic su \"Continua\" o \"Rifiuta\". Per effettuare scelte più dettagliate o saperne di più, fai clic su \"Personalizza\".

Nozioni di base su Kinesis Data Streams per Amazon DynamoDB

Modalità Focus
Nozioni di base su Kinesis Data Streams per Amazon DynamoDB - Amazon DynamoDB

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Questa sezione descrive come utilizzare Kinesis Data Streams per le tabelle Amazon DynamoDB con la console Amazon DynamoDB, il () e il. AWS Command Line Interface AWS CLI API

Creazione di un flusso di dati Amazon Kinesis attivo

Tutti questi esempi utilizzano la tabella DynamoDB Music che è stata creata come parte del tutorial Nozioni di base su DynamoDB.

Per ulteriori informazioni su come creare consumatori e connettere il flusso di dati Kinesis ad altri servizi AWS , consulta Leggere i dati dal flusso di dati Amazon Kinesis, nella Guida per gli sviluppatori del flusso di dati Amazon Kinesis.

Nota

Quando utilizzi KDS gli shard per la prima volta, ti consigliamo di impostarli in modo da scalarli verso l'alto e verso il basso in base ai modelli di utilizzo. Dopo aver accumulato ulteriori dati sui modelli di utilizzo, è possibile adattare le partizioni nel flusso di conseguenza.

Console
  1. Accedi AWS Management Console e apri la console Kinesis all'indirizzo. https://console.aws.amazon.com/kinesis/

  2. Scegli Crea flusso di dati e segui le istruzioni per creare un flusso denominato samplestream.

  3. Apri la console DynamoDB all'indirizzo. https://console.aws.amazon.com/dynamodb/

  4. Nel riquadro di navigazione sul lato sinistro della console scegli Tables (Tabelle).

  5. Seleziona la tabella Music.

  6. Scegli la scheda Exports and streams (Esportazioni e flussi).

  7. (Facoltativo) Nella sezione Dettagli del flusso di dati di Amazon Kinesis, puoi modificare la precisione del timestamp del record da microsecondi (impostazione predefinita) a millisecondi.

  8. Scegli samplestream dall'elenco a discesa.

  9. Scegli il pulsante Attiva.

AWS CLI
  1. Crea un flusso di dati Kinesis denominato samplestream utilizzando il comando create-stream.

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    Prima di configurare il numero di partizioni per il flusso di dati Kinesis, consulta Considerazioni sulla gestione delle partizioni per Kinesis Data Streams.

  2. Verificare che il flusso Kinesis sia attivo e pronto per l'uso utilizzando il comando describe-stream.

    aws kinesis describe-stream --stream-name samplestream
  3. Abilitare lo streaming di Kinesis sulla tabella DynamoDB utilizzando il comando DynamoDB enable-kinesis-streaming-destination. Sostituisci il valore stream-arn con quello restituito da describe-stream nella fase precedente. Facoltativamente, abilita lo streaming con una precisione più granulare (microsecondi) dei valori di timestamp restituiti su ogni record.

    Abilita lo streaming con una precisione di timestamp in microsecondi:

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND

    Oppure abilita lo streaming con la precisione del timestamp predefinita (millisecondi):

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
  4. Abilitare lo streaming di Kinesis sulla tabella DynamoDB utilizzando il comando describe-kinesis-streaming-destination DynamoDB.

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. Scrivere i dati nella tabella DynamoDB utilizzando il comando put-item, come descritto nella Guida per gli sviluppatori di DynamoDB.

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. Utilizzate il CLI comando Kinesis get-records per recuperare il contenuto del flusso Kinesis. Quindi utilizzare il seguente frammento di codice per deserializzare il contenuto del flusso.

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. Seguire le istruzioni contenute nella guida per gli sviluppatori di Kinesis Data Streams per creare un flusso di dati Kinesis denominato samplestream tramite Java.

    Prima di configurare il numero di partizioni per il flusso di dati Kinesis, consulta Considerazioni sulla gestione delle partizioni per Kinesis Data Streams.

  2. Utilizzo il seguente frammento di codice per abilitare lo streamingKinesis sulla tabella DynamoDB. Facoltativamente, abilita lo streaming con una precisione più granulare (microsecondi) dei valori di timestamp restituiti su ogni record.

    Abilita lo streaming con una precisione di timestamp in microsecondi:

    EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder() .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND) .build(); EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .enableKinesisStreamingConfiguration(enableKdsConfig) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);

    Oppure abilita lo streaming con la precisione del timestamp predefinita (millisecondi):

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. Segui le istruzioni contenute nella guida per gli sviluppatori di Kinesis Data Streams per leggere dal flusso di dati creato.

  4. Utilizza il seguente frammento di codice per deserializzare il contenuto del flusso.

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
  1. Accedi AWS Management Console e apri la console Kinesis all'indirizzo. https://console.aws.amazon.com/kinesis/

  2. Scegli Crea flusso di dati e segui le istruzioni per creare un flusso denominato samplestream.

  3. Apri la console DynamoDB all'indirizzo. https://console.aws.amazon.com/dynamodb/

  4. Nel riquadro di navigazione sul lato sinistro della console scegli Tables (Tabelle).

  5. Seleziona la tabella Music.

  6. Scegli la scheda Exports and streams (Esportazioni e flussi).

  7. (Facoltativo) Nella sezione Dettagli del flusso di dati di Amazon Kinesis, puoi modificare la precisione del timestamp del record da microsecondi (impostazione predefinita) a millisecondi.

  8. Scegli samplestream dall'elenco a discesa.

  9. Scegli il pulsante Attiva.

Apportare modifiche a un flusso di dati Amazon Kinesis attivo

Questa sezione descrive come apportare modifiche a una configurazione attiva di Kinesis Data Streams for DynamoDB utilizzando la console e il. AWS CLI API

AWS Management Console

  1. Apri la console DynamoDB all'indirizzo https://console.aws.amazon.com/dynamodb/

  2. Vai al tuo tavolo.

  3. Scegli Esportazioni e flussi.

AWS CLI

  1. Chiama describe-kinesis-streaming-destination per confermare che lo stream sia ACTIVE attivo.

  2. ChiamaUpdateKinesisStreamingDestination, come in questo esempio:

    aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
  3. Chiama describe-kinesis-streaming-destination per confermare che lo stream sia attivoUPDATING.

  4. Chiama describe-kinesis-streaming-destination periodicamente fino al ripristino dello stato dello ACTIVE streaming. In genere occorrono fino a 5 minuti prima che gli aggiornamenti con precisione del timestamp abbiano effetto. Una volta aggiornato lo stato, ciò indica che l'aggiornamento è completo e il nuovo valore di precisione verrà applicato ai record futuri.

  5. Scrivi nella tabella usandoputItem.

  6. Usa il get-records comando Kinesis per ottenere i contenuti dello stream.

  7. Verifica che le ApproximateCreationDateTime scritture abbiano la precisione desiderata.

Java API

  1. Fornisci un frammento di codice che costruisca una UpdateKinesisStreamingDestination richiesta e una risposta. UpdateKinesisStreamingDestination

  2. Fornisci un frammento di codice che costruisce una richiesta e un. DescribeKinesisStreamingDestination DescribeKinesisStreamingDestination response

  3. Chiama describe-kinesis-streaming-destination periodicamente fino al ripristino dello stato dello streaming, a indicare che l'aggiornamento è completo e il nuovo valore di precisione verrà applicato ai record futuri. ACTIVE

  4. Esegue le scritture sulla tabella.

  5. Leggi dallo stream e deserializza il contenuto dello stream.

  6. Verifica che le ApproximateCreationDateTime scritture abbiano la precisione desiderata.

PrivacyCondizioni del sitoPreferenze cookie
© 2025, Amazon Web Services, Inc. o società affiliate. Tutti i diritti riservati.