Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Change-Streams mit Amazon DocumentDB verwenden
Die Change-Streams-Funktion in Amazon DocumentDB (mit MongoDB-Kompatibilität) bietet eine zeitlich geordnete Abfolge von Änderungsereignissen, die in den Sammlungen Ihres Clusters auftreten. Sie können Ereignisse aus einem Change Stream lesen, um zahlreiche verschiedene Anwendungsfälle zu implementieren, einschließlich:
-
Änderungsbenachrichtigung
-
Volltextsuche mit Amazon OpenSearch Service (OpenSearch Service)
-
Analytik mit Amazon Redshift
Anwendungen können Änderungsstreams verwenden, um Datenveränderungen bei individuellen Sammlungen zu abonnieren. Die Ereignisse werden in Change Streams in der Reihenfolge angeordnet, wie sie im Cluster auftreten, und nach der Aufzeichnung des Ereignisses 3 Stunden (Standardeinstellung) gespeichert. Die Aufbewahrungsfrist kann mithilfe des change_stream_log_retention_duration
Parameters auf bis zu 7 Tage verlängert werden. Informationen zum Ändern der Aufbewahrungsdauer für Change-Stream-Protokolle finden Sie unter Aufbewahrungsdauer für Change-Stream-Protokolle ändern.
Themen
- Unterstützte Vorgänge
- Fakturierung
- Einschränkungen
- Change-Streams aktivieren
- Beispiel: Change-Streams mit Python verwenden
- Vollständige Suche nach Dokumenten
- Wiederaufnahme eines Change-Streams
- Wiederaufnahme eines Change-Streams mit startAtOperationTime
- Transaktionen in Change-Streams
- Änderung der Aufbewahrungsdauer des Change-Stream-Protokolls
- Verwendung von Change-Streams auf sekundären Instanzen
Unterstützte Vorgänge
Amazon DocumentDB unterstützt die folgenden Operationen für Change-Streams:
-
Alle Änderungsereignisse werden in der MongoDB
db.collection.watch()
db.watch()
und derclient.watch()
API unterstützt. -
Vollständige Dokumentsuche nach Aktualisierungen.
-
Aggregationsphasen:
$match
,$project
$redact
, und und$addFields
.$replaceRoot
-
Einen Change-Stream von einem Resume-Token aus fortsetzen
-
Wiederaufnahme eines Change-Streams aus einem Zeitstempel mit
startAtOperation
(gilt für Amazon DocumentDB 4.0+)
Fakturierung
Die Amazon DocumentDB DocumentDB-Funktion zum Ändern von Streams ist standardmäßig deaktiviert und es fallen keine zusätzlichen Gebühren an, bis die Funktion aktiviert ist. Die Verwendung von Change-Streams in einem Cluster verursacht zusätzliche Lese-, Schreib- IOs und Speicherkosten. Sie können den modifyChangeStreams
API-Vorgang verwenden, um diese Funktion für Ihren Cluster zu aktivieren. Weitere Informationen zur Preisgestaltung finden Sie unter Amazon DocumentDB DocumentDB-Preise
Einschränkungen
Für Change-Streams gelten in Amazon DocumentDB die folgenden Einschränkungen:
-
Auf Amazon DocumentDB 3.6 und Amazon DocumentDB 4.0 können Change-Streams nur über eine Verbindung zur primären Instance eines Amazon DocumentDB-Clusters geöffnet werden. Das Lesen aus Change-Streams auf einer Replikat-Instance wird in Amazon DocumentDB 3.6 und Amazon DocumentDB 4.0 nicht unterstützt. Beim Aufruf der API-Operation
watch()
müssen Sie die Leseeinstellungprimary
angeben, um sicherzustellen, dass alle Lesevorgänge an die primäre Instance weitergeleitet werden (siehe den Abschnitt Beispiel). -
In Amazon DocumentDB 5.0 können Change-Streams sowohl von primären Instances als auch von sekundären Instances, einschließlich globaler Cluster, geöffnet werden. Sie können eine sekundäre Lesepräferenz angeben, um die Change-Streams an sekundäre Instances umzuleiten. Verwendung von Change-Streams auf sekundären InstanzenWeitere bewährte Methoden und Einschränkungen finden Sie unter.
-
Ereignisse, die in einen Change-Stream für eine Sammlung geschrieben wurden, sind bis zu 7 Tage lang verfügbar (die Standardeinstellung ist 3 Stunden). Änderungsstream-Daten werden nach dem Zeitfenster der Protokollaufbewahrungsdauer gelöscht, auch wenn keine neuen Änderungen vorgenommen wurden.
-
Eine über längere Zeit für eine Sammlung ausgeführte Schreiboperation wie
updateMany
oderdeleteMany
kann das Schreiben von Change Stream-Ereignissen vorübergehend bis zum Abschluss der über längere Zeit ausgeführten Schreiboperation blockieren. -
Amazon DocumentDB unterstützt das MongoDB-Betriebsprotokoll ()
oplog
nicht. -
Bei Amazon DocumentDB müssen Sie Change-Streams für eine bestimmte Sammlung explizit aktivieren.
-
Wenn die Gesamtgröße eines Change Stream-Ereignisses (einschließlich der Änderungsdaten und des vollständigen Dokuments, wenn angefordert) größer als
16 MB
ist, tritt auf dem Client ein Lesefehler für die Change Streams auf. -
Der Ruby-Treiber wird derzeit nicht unterstützt, wenn
db.watch()
undclient.watch()
mit Amazon DocumentDB 3.6 verwendet wird. -
Die Ausgabe des
updateDescription
Befehls in Change-Streams unterscheidet sich in Amazon DocumentDB von der in MongoDB, wenn der aktualisierte Wert des Felds derselbe ist wie der vorherige:Amazon DocumentDB gibt kein Feld in der
updateDescription
Ausgabe zurück, wenn das angegebene Feld im$set
Befehl angegeben ist und sein Zielwert bereits dem Quellwert entspricht.MongoDB gibt das Feld in der Ausgabe zurück, auch wenn der angegebene Wert dem aktuellen Wert entspricht.
Change-Streams aktivieren
Sie können Amazon DocumentDB DocumentDB-Change-Streams für alle Sammlungen innerhalb einer bestimmten Datenbank oder nur für ausgewählte Sammlungen aktivieren. Im Folgenden finden Sie Beispiele für die Aktivierung von Change Streams für verschiedene Anwendungsfälle über die Mongo-Shell. Leere Zeichenfolgen werden bei der Angabe von Datenbank- und Sammlungsnamen als Platzhalter behandelt.
//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});
Change Streams werden für eine Sammlung aktiviert, wenn eine der folgenden Bedingungen erfüllt ist:
-
Sowohl die Datenbank als auch die Sammlung sind explizit aktiviert.
-
Die Datenbank, die die Sammlung enthält, ist aktiviert.
-
Alle Datenbanken sind aktiviert.
Wenn Sie eine Sammlung aus einer Datenbank löschen, werden Change Streams für diese Sammlung nicht deaktiviert, wenn Change Streams für die übergeordnete Datenbank auch aktiviert sind oder wenn alle Datenbanken im Cluster aktiviert sind. Wenn eine neue Sammlung mit demselben Namen wie die gelöschte Sammlung erstellt wird, werden Change Streams für diese Sammlung aktiviert.
Sie können alle aktivierten Change Streams Ihres Clusters mithilfe der $listChangeStreams
-Aggregationspipeline-Phase auflisten. Alle von Amazon DocumentDB unterstützten Aggregationsphasen können in der Pipeline für die weitere Verarbeitung verwendet werden. Wenn eine zuvor aktivierte Sammlung deaktiviert wurde, wird sie nicht in der $listChangeStreams
-Ausgabe angezeigt.
//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));
Beispiel: Change-Streams mit Python verwenden
Im Folgenden finden Sie ein Beispiel für die Verwendung eines Amazon DocumentDB DocumentDB-Change-Streams mit Python auf Sammlungsebene.
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """
Das Folgende ist ein Beispiel für die Verwendung eines Amazon DocumentDB DocumentDB-Change-Streams mit Python auf Datenbankebene.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """
Vollständige Suche nach Dokumenten
Das Änderungsereignis „Aktualisierung“ enthält nicht das vollständige Dokument, sondern lediglich die ausgeführte Änderung. Wenn für Ihren Anwendungsfall das vollständige Dokument erforderlich ist, das von einer Aktualisierung betroffen ist, können Sie beim Öffnen des Datenstroms die vollständige Dokumentsuche aktivieren.
Das fullDocument
-Dokument für ein Update-Change Stream-Ereignis stellt die neueste Version des aktualisierten Dokuments zum Zeitpunkt der Dokumentsuche dar. Wenn zwischen der Aktualisierungsoperation und der fullDocument
-Suche Änderungen ausgeführt wurden, besitzt das fullDocument
-Dokument möglicherweise nicht den Dokumentstatus zur Aktualisierungszeit.
Verwenden Sie dieses Beispiel, um ein Stream-Objekt mit aktivierter Aktualisierungssuche zu erstellen:
stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()
Die Ausgabe des Stream-Objekts sieht ungefähr so aus:
{'_id': {'_data': '015daf9b7c00000001010000000100009025'},
'clusterTime': Timestamp(1571789692, 1),
'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'update',
'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}
Wiederaufnahme eines Change-Streams
Sie können einen Change Stream zu einem späteren Zeitpunkt mithilfe eines Fortsetzungs-Tokens fortsetzen, das dem Feld _id
des zuletzt abgerufenen Änderungsereignisdokuments entspricht.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """
Wiederaufnahme eines Change-Streams mit startAtOperationTime
Sie können einen Änderungsstream zu einem späteren Zeitpunkt ab einem bestimmten Zeitstempel wieder aufnehmen, indem Sie startAtOperationTime
Anmerkung
Die Fähigkeit zur Verwendung startAtOperationTime
ist in Amazon DocumentDB 4.0+ verfügbar. Bei Verwendung startAtOperationTime
gibt der Change-Stream-Cursor nur Änderungen zurück, die zu oder nach dem angegebenen Zeitstempel vorgenommen wurden. Die resumeAfter
Befehle startAtOperationTime
und schließen sich gegenseitig aus und können daher nicht zusammen verwendet werden.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """
Transaktionen in Change-Streams
Change-Stream-Ereignisse enthalten keine Ereignisse, die auf Transaktionen zurückzuführen sind, für die kein Commitment abgeschlossen wurde und/oder abgebrochen wurde. Wenn Sie beispielsweise eine Transaktion mit einem Vorgang und einem INSERT
Vorgang starten und Ihr UPDATE
Vorgang erfolgreich ist, der INSERT
UPDATE
Vorgang jedoch fehlschlägt, wird die Transaktion zurückgesetzt. Da diese Transaktion zurückgesetzt wurde, enthält Ihr Change-Stream keine Ereignisse für diese Transaktion.
Änderung der Aufbewahrungsdauer des Change-Stream-Protokolls
Sie können die Aufbewahrungsdauer für das Change-Stream-Protokoll so ändern, dass sie zwischen 1 Stunde und 7 Tagen liegt, indem Sie AWS Management Console oder die verwenden AWS CLI.
Anmerkung
Bei der Aufbewahrung von Change-Stream-Protokollen werden Protokolle, die älter als der konfigurierte change_stream_log_retention_duration
Wert sind, erst gelöscht, wenn die Protokollgröße größer als (>) 51.200 MB ist.
Verwendung von Change-Streams auf sekundären Instanzen
Um mit der Verwendung von Change Stream auf sekundären Instances zu beginnen, öffnen Sie den Change-Stream-Cursor readPreference
als sekundäre Instanz.
Sie können einen Change-Stream-Cursor öffnen, um nach Änderungsereignissen in einer bestimmten Sammlung oder allen Sammlungen in einem Cluster oder einer Datenbank Ausschau zu halten. Sie können einen Change-Stream-Cursor auf jeder Amazon DocumentDB DocumentDB-Instance öffnen und Change-Stream-Dokumente sowohl von Writer- als auch von Reader-Instances abrufen. Sie können Change-Stream-Tokens (wie resumeToken
oderstartOperationTime
) für verschiedene Change-Stream-Cursors, die auf einer Writer- und Reader-Instance geöffnet sind, gemeinsam nutzen.
Beispiel
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)
Richtlinien und Einschränkungen für Change-Streams auf sekundären Instanzen
Change-Stream-Ereignisse müssen von der primären Instance auf die sekundären Instances repliziert werden. Sie können die Verzögerung anhand der
DBInstanceReplicaLag
Metrik in Amazon überwachen CloudWatch.Zeitstempel auf sekundären Instances stimmen möglicherweise nicht immer mit denen der primären Instance überein. In diesem Fall müssen Sie mit Verzögerungen beim Zeitstempel der sekundären Instanz rechnen, damit diese aufholen kann. Als bewährte Methode empfehlen wir, die Überwachung auf der sekundären Instanz
resumeToken
zu verwendenstartAtOperationTime
oder zu starten.Auf sekundären Instanzen kann es zu einem geringeren Durchsatz kommen als auf der primären Instanz, wenn Ihr Dokument groß ist, Sie dies tun
fullDocumentLookup
, und auf der primären Instanz eine hohe Arbeitslast beim gleichzeitigen Schreiben anfällt. Als bewährte Methode empfehlen wir, die Trefferquote im Puffercache auf der sekundären Ebene zu überwachen und sicherzustellen, dass die Trefferquote im Puffercache hoch ist.