Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Mise en route avec Kinesis Data Streams pour Amazon DynamoDB
Cette section explique comment utiliser les tables Kinesis Data Streams pour Amazon DynamoDB avec la console Amazon DynamoDB, le () et le. AWS Command Line Interface AWS CLI API
Création d'un flux de données Amazon Kinesis actif
Tous ces exemples utilisent la table DynamoDB Music
créée dans le cadre du tutoriel Mise en route avec DynamoDB.
Pour en savoir plus sur la façon de créer des consommateurs et de connecter votre flux de données Kinesis à d'autres services AWS , consultez Lecture de données à partir de Kinesis Data Streams dans le Guide du développeur Amazon Kinesis Data Streams.
Lorsque vous utilisez des KDS partitions pour la première fois, nous vous recommandons de les redimensionner à la hausse ou à la baisse en fonction des modèles d'utilisation. Une fois que vous aurez accumulé davantage de données sur les modèles d'utilisation, vous pourrez ajuster les partitions de votre flux en conséquence.
- Console
-
-
Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à l'adresse. https://console.aws.amazon.com/kinesis/
-
Choisissez Create data stream (Créer un flux de données), puis suivez les instructions pour créer un flux appelé samplestream
.
-
Ouvrez la console DynamoDB à l'adresse. https://console.aws.amazon.com/dynamodb/
-
Dans le volet de navigation sur le côté gauche de la console, choisissez Tables.
-
Choisissez la table Music.
-
Choisissez l'onglet Exportations et flux.
-
(Facultatif) Dans les détails du flux de données Amazon Kinesis, vous pouvez modifier la précision de l'horodatage des enregistrements de la microseconde (par défaut) à la milliseconde.
-
Choisissez samplestream dans la liste déroulante.
-
Cliquez sur le bouton Activer.
- AWS CLI
-
-
Créez un flux de données Kinesis nommé samplestream
à l'aide de la commande create-stream.
aws kinesis create-stream --stream-name samplestream --shard-count 3
Consultez Considérations relatives à la gestion des partitions pour Kinesis Data Streams avant de définir le nombre de partitions du flux de données Kinesis.
-
Vérifiez que le flux Kinesis est actif et prêt pour utilisation à l'aide de la commande describe-stream.
aws kinesis describe-stream --stream-name samplestream
-
Activez le streaming Kinesis sur la table DynamoDB à l'aide de la commande DynamoDB enable-kinesis-streaming-destination
. Remplacez la valeur stream-arn
par celle que la commande describe-stream
a renvoyée à l'étape précédente. Activez éventuellement le streaming avec une précision plus précise (microseconde) des valeurs d'horodatage renvoyées sur chaque enregistrement.
Activez le streaming avec une précision d'horodatage de l'ordre de la microseconde :
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
Ou activez le streaming avec la précision d'horodatage par défaut (milliseconde) :
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
-
Vérifiez si le streaming Kinesis est actif sur la table à l'aide de la commande DynamoDB describe-kinesis-streaming-destination
.
aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
Ecrivez des données dans la table DynamoDB à l'aide de la commande put-item
, comme décrit dans le Manuel du développeur DynamoDB.
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
Utilisez la CLI commande Kinesis get-records pour récupérer le contenu du flux Kinesis. Ensuite, utilisez l'extrait de code suivant pour désérialiser le contenu du flux.
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we want to fetch the value
* of this attribute from the new item image. The following code fetches this value.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
- Java
-
-
Suivez les instructions du Manuel du développeur Kinesis Data Streams pour créer un flux de données Kinesis nommé samplestream
à l'aide de Java.
Consultez Considérations relatives à la gestion des partitions pour Kinesis Data Streams avant de définir le nombre de partitions pour le flux de données Kinesis.
-
Utilisez l'extrait de code suivant pour activer le streaming Kinesis sur la table DynamoDB. Activez éventuellement le streaming avec une précision plus précise (microseconde) des valeurs d'horodatage renvoyées sur chaque enregistrement.
Activez le streaming avec une précision d'horodatage de l'ordre de la microseconde :
EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
.approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
.build();
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.enableKinesisStreamingConfiguration(enableKdsConfig)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
Ou activez le streaming avec la précision d'horodatage par défaut (milliseconde) :
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
Suivez les instructions du Guide du développeur Kinesis Data Streams pour lire le flux de données créé.
-
Utilisez l'extrait de code suivant pour désérialiser le contenu du flux
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
* of this attribute from the new item image, the below code would fetch this.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
Apporter des modifications à un flux de données Amazon Kinesis actif
Cette section décrit comment apporter des modifications à une configuration active de Kinesis Data Streams pour DynamoDB à l'aide de la console et du. AWS CLI API
AWS Management Console
AWS CLI
-
Appelez describe-kinesis-streaming-destination
pour confirmer que le stream existeACTIVE
.
-
AppelezUpdateKinesisStreamingDestination
, comme dans cet exemple :
aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
-
Appelez describe-kinesis-streaming-destination
pour confirmer que le stream existeUPDATING
.
-
Appelez describe-kinesis-streaming-destination
régulièrement jusqu'à ce que le statut de diffusion soit ACTIVE
rétabli. Il faut généralement jusqu'à 5 minutes pour que les mises à jour de précision de l'horodatage prennent effet. Une fois ce statut mis à jour, cela indique que la mise à jour est terminée et que la nouvelle valeur de précision sera appliquée aux futurs enregistrements.
-
Écrivez dans le tableau en utilisantputItem
.
-
Utilisez la get-records
commande Kinesis pour obtenir le contenu du stream.
-
Vérifiez que ApproximateCreationDateTime
les écritures ont la précision souhaitée.
Java API
-
Fournissez un extrait de code qui construit une UpdateKinesisStreamingDestination
demande et une réponse. UpdateKinesisStreamingDestination
-
Fournissez un extrait de code qui construit une DescribeKinesisStreamingDestination
requête et un. DescribeKinesisStreamingDestination response
-
Appelez describe-kinesis-streaming-destination
régulièrement jusqu'à ce que l'état de diffusion soit ACTIVE
rétabli, indiquant que la mise à jour est terminée et que la nouvelle valeur de précision sera appliquée aux futurs enregistrements.
-
Effectuez des écritures sur la table.
-
Lisez le contenu du flux et désérialisez le contenu du flux.
-
Vérifiez que ApproximateCreationDateTime
les écritures ont la précision souhaitée.