Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation de flux de modifications avec Amazon DocumentDB
La fonctionnalité de flux de modifications d'Amazon DocumentDB (compatible avec MongoDB) fournit une séquence chronologique des événements de modification qui se produisent dans les collections de votre cluster. Vous pouvez lire des événements à partir d'un flux de modifications afin d'implémenter de nombreux cas d'utilisation différents, notamment les suivants :
-
Notification de modification
-
Recherche en texte intégral avec Amazon OpenSearch Service (OpenSearch Service)
-
Analyses avec Amazon Redshift
Les applications peuvent utiliser les flux de modifications pour souscrire à tous les changements de données dans des collections individuelles. Les événements de flux de modifications sont ordonnés au fur et à mesure qu'ils se produisent sur le cluster et sont stockés pendant trois heures (par défaut) après l'enregistrement de l'événement. La période de conservation peut être prolongée jusqu'à 7 jours en utilisant le change_stream_log_retention_duration
paramètre. Pour modifier la période de conservation du flux de modifications, consultez la section Modification de la durée de conservation du journal du flux de modifications.
Rubriques
- Opérations prises en charge
- Facturation
- Limites
- Activer les flux de changement
- Exemple : utilisation de flux de modifications avec Python
- Recherche de documents complets
- Reprise d'un flux de modifications
- Reprise d'un flux de modifications avec startAtOperationTime
- Transactions dans les flux de changement
- Modification de la durée de conservation du journal du flux de modifications
- Utilisation des flux de modifications sur les instances secondaires
Opérations prises en charge
Amazon DocumentDB prend en charge les opérations suivantes pour les flux de modifications :
-
Tous les événements de changement pris en charge dans MongoDB
db.collection.watch()
, etdb.watch()
.client.watch()
API -
Recherche complète de documents pour les mises à jour.
-
Étapes d'agrégation :
$match
$project
$redact
,,$addFields
et$replaceRoot
. -
Reprise d'un flux de modifications à partir d'un jeton de CV
-
Reprise d'un flux de modifications à partir d'un horodatage en utilisant
startAtOperation
(applicable à Amazon DocumentDB 4.0+)
Facturation
La fonctionnalité des flux de modifications d'Amazon DocumentDB est désactivée par défaut et n'entraîne aucun frais supplémentaire tant qu'elle n'est pas activée. L'utilisation de flux de modifications dans un cluster entraîne des coûts de lecture, d'écriture IOs et de stockage supplémentaires. Vous pouvez utiliser cette modifyChangeStreams
API opération pour activer cette fonctionnalité pour votre cluster. Pour plus d'informations sur la tarification, consultez la tarification d'Amazon DocumentDB.
Limites
Les flux de modifications présentent les limites suivantes dans Amazon DocumentDB :
-
Sur Amazon DocumentDB 3.6 et Amazon DocumentDB 4.0, les flux de modifications ne peuvent être ouverts qu'à partir d'une connexion à l'instance principale d'un cluster Amazon DocumentDB. La lecture à partir de flux de modifications sur une instance de réplique n'est pas prise en charge sur Amazon DocumentDB 3.6 et Amazon DocumentDB 4.0. Lorsque vous appelez l'
watch()
APIopération, vous devez spécifier une préférence deprimary
lecture pour garantir que toutes les lectures sont dirigées vers l'instance principale (voir la section Exemple). -
Sur Amazon DocumentDB 5.0, les flux de modifications peuvent être ouverts à la fois depuis l'instance principale et les instances secondaires, y compris les clusters globaux. Vous pouvez spécifier une préférence de lecture secondaire pour rediriger les flux de modifications vers des instances secondaires. Consultez Utilisation des flux de modifications sur les instances secondaires les meilleures pratiques et limitations supplémentaires.
-
Les événements écrits dans un flux de modifications pour une collection sont disponibles pendant 7 jours maximum (la valeur par défaut est de 3 heures). Les données du flux de modifications sont supprimées après la fenêtre de durée de conservation du journal, même si aucune nouvelle modification n'est survenue.
-
Une opération d'écriture longue en cours d'exécution sur une collection comme
updateMany
oudeleteMany
peut bloquer temporairement l'écriture des événements de flux de modifications jusqu'à ce que l'opération d'écriture longue en cours d'exécution soit terminée. -
Amazon DocumentDB ne prend pas en charge le journal des opérations MongoDB ().
oplog
-
Avec Amazon DocumentDB, vous devez activer explicitement les flux de modifications sur une collection donnée.
-
Si la taille totale d'un événement de flux de modifications (y compris les données de modification et le document complet, le cas échéant) est supérieure à
16 MB
, le client rencontre un échec de lecture sur les flux de modification. -
Le pilote Ruby n'est actuellement pas pris en charge lors de l'utilisation
db.watch()
etclient.watch()
avec Amazon DocumentDB 3.6. -
La sortie de la
updateDescription
commande dans les flux de modification est différente dans Amazon DocumentDB par rapport à MongoDB lorsque la valeur mise à jour du champ est identique à la valeur précédente :Amazon DocumentDB ne renvoie aucun champ dans la
updateDescription
sortie si le champ fourni est spécifié dans la$set
commande et que sa valeur cible est déjà égale à la valeur source.MongoDB renvoie le champ dans la sortie, même si la valeur spécifiée est égale à la valeur actuelle.
Activer les flux de changement
Vous pouvez activer les flux de modifications Amazon DocumentDB pour toutes les collections d'une base de données donnée, ou uniquement pour certaines collections. Voici des exemples de la façon d'activer les flux de changement pour différents cas d'utilisation en utilisant le shell mongo. Les chaînes vides sont traitées comme des caractères génériques lors de la spécification des noms de base de données et de collections.
//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});
Les flux de modifications seront activés pour une collection si l'un des éléments suivants est vrai :
-
La base de données et la collection sont explicitement activées.
-
La base de données contenant la collection est activée.
-
Toutes les bases de données sont activées.
Le fait de supprimer une collection d'une base de données ne désactive pas les flux de modification pour cette collection si la base de données parent a également activé les flux de modification ou si toutes les bases de données du cluster sont activées. Si une nouvelle collection est créée avec le même nom que la collection supprimée, les flux de modifications seront activés pour cette collection.
Vous pouvez répertorier tous les flux de modifications activés de votre cluster à l'aide de l'étape du pipeline d'agrégation $listChangeStreams
. Toutes les étapes d'agrégation prises en charge par Amazon DocumentDB peuvent être utilisées dans le pipeline pour un traitement supplémentaire. Si une collection précédemment activée a été désactivée, elle n'apparaît pas dans la sortie $listChangeStreams
.
//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));
Exemple : utilisation de flux de modifications avec Python
Voici un exemple d'utilisation d'un flux de modifications Amazon DocumentDB avec Python au niveau de la collection.
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """
Voici un exemple d'utilisation d'un flux de modifications Amazon DocumentDB avec Python au niveau de la base de données.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """
Recherche de documents complets
L'événement de modification de mise à jour n'inclut pas le document complet, mais uniquement la modification qui a été effectuée. Si votre cas d'utilisation nécessite le document complet affecté par une mise à jour, vous pouvez activer la recherche complète de documents lors de l'ouverture du flux.
Le document fullDocument
d'un événement de flux de modification de mise à jour représente la version la plus récente du document mis à jour au moment de la recherche de document. Si des modifications se sont produites entre l'opération de mise à jour et la recherche fullDocument
, le document fullDocument
peut ne pas représenter l'état du document au moment de la mise à jour.
Pour créer un objet de flux avec la recherche de mise à jour activée, utilisez cet exemple :
stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()
La sortie de l'objet stream ressemblera à ceci :
{'_id': {'_data': '015daf9b7c00000001010000000100009025'},
'clusterTime': Timestamp(1571789692, 1),
'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'update',
'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}
Reprise d'un flux de modifications
Vous pouvez reprendre un flux de modifications ultérieurement à l'aide d'un jeton de reprise, qui est égal au champ _id
du dernier document d'événement de modification récupéré.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """
Reprise d'un flux de modifications avec startAtOperationTime
Vous pouvez reprendre un flux de modifications ultérieurement à partir d'un horodatage spécifique en utilisantstartAtOperationTime
.
Note
La possibilité d'utilisation startAtOperationTime
est disponible dans Amazon DocumentDB 4.0+. Lors de son utilisationstartAtOperationTime
, le curseur du flux de modifications renvoie uniquement les modifications survenues à ou après l'horodatage spécifié. Les resumeAfter
commandes startAtOperationTime
et s'excluent mutuellement et ne peuvent donc pas être utilisées ensemble.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """
Transactions dans les flux de changement
Les événements du flux de modifications ne contiendront pas d'événements provenant de transactions non validées et/ou abandonnées. Par exemple, si vous démarrez une transaction avec une INSERT
opération et une UPDATE
opération et. Si votre INSERT
opération réussit, mais qu'elle échoue, la transaction sera annulée. UPDATE
Cette transaction ayant été annulée, votre flux de modifications ne contiendra aucun événement lié à cette transaction.
Modification de la durée de conservation du journal du flux de modifications
Vous pouvez modifier la durée de conservation du journal des flux de modifications pour qu'elle soit comprise entre 1 heure et 7 jours à l'aide du AWS Management Console ou du AWS CLI.
Note
La conservation des journaux du flux de modifications ne supprime pas les journaux antérieurs à la change_stream_log_retention_duration
valeur configurée tant que la taille du journal n'est pas supérieure à (>) 51 200 Mo.
Utilisation des flux de modifications sur les instances secondaires
Pour commencer à utiliser le flux de modifications sur les instances secondaires, ouvrez le curseur du flux de modifications readPreference
en le désignant comme secondaire.
Vous pouvez ouvrir un curseur de flux de modifications pour surveiller les événements de modification concernant une collection spécifique ou toutes les collections d'un cluster ou d'une base de données. Vous pouvez ouvrir un curseur de flux de modifications sur n'importe quelle instance Amazon DocumentDB et récupérer les documents du flux de modifications à partir des instances de rédacteur et de lecteur. Vous pouvez partager des jetons de flux de modifications (tels que resumeToken
oustartOperationTime
) entre différents curseurs de flux de modifications ouverts sur une instance de rédacteur et de lecteur.
Exemple
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)
Directives et limites relatives aux flux de modifications sur les instances secondaires
Les événements du flux de modifications doivent être répliqués de l'instance principale vers les instances secondaires. Vous pouvez surveiller le décalage par rapport à la
DBInstanceReplicaLag
métrique sur Amazon CloudWatch.Les horodatages des instances secondaires ne sont pas toujours synchronisés avec ceux de l'instance principale. Dans ce cas, attendez-vous à des retards sur l'horodatage de l'instance secondaire afin qu'elle puisse rattraper son retard. Il est recommandé d'utiliser
startAtOperationTime
ou deresumeToken
démarrer la montre sur l'instance secondaire.Le débit des instances secondaires peut être inférieur à celui de l'instance principale si la taille de votre document est importante, c'est le cas
fullDocumentLookup
, et si la charge de travail d'écriture simultanée est élevée sur l'instance principale. Il est recommandé de surveiller le taux de réussite de votre cache tampon sur le cache secondaire et de vous assurer que le taux de réussite du cache tampon est élevé.