Uso de flujos de cambios con Amazon DocumentDB
La función de flujos de cambios de Amazon DocumentDB (con compatibilidad con MongoDB) brinda una secuencia en orden cronológico de los eventos de actualización que se producen dentro de las colecciones de su clúster. Puede leer eventos de una secuencia de cambios para implementar muchos casos de uso diferentes, incluidos los siguientes:
-
Notificación de cambio
-
Búsqueda de texto completo con Amazon OpenSearch Service (OpenSearch Service)
-
Análisis con Amazon Redshift
Las aplicaciones pueden usar los flujos de cambios para suscribirse a los cambios de datos en colecciones individuales. Los eventos de flujos de cambios se ordenan a medida que se producen en el clúster y se almacenan durante 3 horas (valor predeterminado) desde el momento de registro del evento. El período de retención se puede ampliar hasta 7 días utilizando el parámetro change_stream_log_retention_duration
. Para modificar el período de retención del flujo de cambios, consulte Modificación de la duración de retención del registro del flujo de cambios.
Temas
- Operaciones admitidas
- Facturación
- Limitaciones
- Habilitación de flujos de cambios
- Ejemplo: uso de flujos de cambios con Python
- Búsqueda completa de documentos
- Reanudación de un flujo de cambios
- Reanudación de un flujo de cambios con startAtOperationTime
- Transacciones en flujos de cambios
- Modificación de la duración de la retención del registro de flujos de cambios
- Uso de flujos de cambios en instancias secundarias
Operaciones admitidas
Amazon DocumentDB admite las siguientes operaciones para flujos de cambio:
-
Todos los eventos de cambio admitidos en la API
db.collection.watch()
,db.watch()
yclient.watch()
de MongoDB. -
Búsqueda completa de documentos para actualizaciones.
-
Etapas de agregación:
$match
,$project
,$redact
,$addFields
y$replaceRoot
. -
Reanudación de un flujo de cambios desde un token de reanudación
-
Reanudación de un flujo de cambios a partir de una marca de tiempo mediante
startAtOperation
(aplicable a Amazon DocumentDB 4.0 y versiones posteriores)
Facturación
La característica de secuencias de cambios de Amazon DocumentDB está desactivada de forma predeterminada y no genera ningún cargo adicional hasta que se habilita y se utiliza. El uso de secuencias de cambios en un clúster conlleva costos adicionales de lectura y escritura de IOPS y almacenamiento. Puede utilizar la operación modifyChangeStreams
de la API con el fin de habilitar esta característica para las colecciones del clúster. Para obtener más información acerca de los precios, consulte Precios de Amazon DocumentDB
Limitaciones
Las secuencias de cambio tienen las siguientes limitaciones en Amazon DocumentDB:
-
En Amazon DocumentDB 3.6. y Amazon DocumentDB 4.0, los flujos de cambios solo se pueden abrir desde una conexión a la instancia principal de un clúster de Amazon DocumentDB. Amazon DocumentDB 3.6 y Amazon DocumentDB 4.0 no admiten la lectura de flujos de cambios en una instancia de réplica. Al invocar la operación de la API
watch()
, debe especificar una preferencia de lecturaprimary
para asegurarse de que todas las lecturas se dirigen a la instancia principal (consulte la sección Ejemplo). -
En Amazon DocumentDB 5.0, los flujos de cambios se pueden abrir tanto desde la instancia principal como desde la secundaria, incluidos los clústeres globales. Puede especificar una preferencia de lectura secundaria para redirigir los flujos de cambios a instancias secundarias. Consulte Uso de flujos de cambios en instancias secundarias para ver las limitaciones y prácticas recomendadas adicionales.
-
Los eventos escritos en una secuencia de cambios de una colección están disponibles hasta 7 días (el valor predeterminado es 3 horas). Los datos de flujos de cambios se eliminan después del periodo de tiempo de conservación de los registros, aunque no se hayan realizado cambios.
-
Una operación de escritura de larga duración en una colección como
updateMany
odeleteMany
puede estancar temporalmente la escritura de eventos de secuencias de cambio hasta que se completa dicha operación de escritura de larga duración. -
Amazon DocumentDB no es compatible con el registro de operaciones de MongoDB (
oplog
). -
Con Amazon DocumentDB, debe habilitar explícitamente las secuencias de cambios en una colección determinada.
-
Si el tamaño total de un evento de secuencias de cambios (incluidos los datos de cambio y el documento completo, si se solicita) es mayor que
16 MB
, el cliente experimentará un error de lectura en las secuencias de cambios. -
Actualmente, el controlador Ruby no es compatible al usar
db.watch()
yclient.watch()
con Amazon DocumentDB 3.6. -
El resultado del comando
updateDescription
en los flujos de cambios es diferente en Amazon DocumentDB que en MongoDB cuando el valor actualizado del campo es el mismo que el anterior:Amazon DocumentDB no devuelve ningún campo en la salida
updateDescription
si el campo proporcionado está especificado en el comando$set
y su valor objetivo ya es igual al valor de origen.MongoDB devuelve el campo en la salida, incluso si el valor especificado es igual al valor actual.
Habilitación de flujos de cambios
Puede habilitar las secuencias de cambios de Amazon DocumentDB para todas las colecciones contenidas en una base de datos determinada o solamente para las colecciones seleccionadas. Los siguientes ejemplos muestran cómo habilitar las secuencias de cambios para diferentes casos de uso mediante el intérprete de comandos de mongo. Las cadenas vacías se tratan como comodines al especificar los nombres de las bases de datos y de las colecciones.
//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});
Las secuencias de cambios se habilitarán para una colección si se cumple alguna de las siguientes condiciones:
-
Tanto la base de datos como la colección están habilitadas explícitamente.
-
La base de datos que contiene la colección está habilitada.
-
Todas las bases de datos están habilitadas.
La eliminación de una colección de una base de datos no deshabilita las secuencias de cambios para esa colección si la base de datos principal también tiene activadas las secuencias de cambios o si todas las bases de datos del clúster están habilitadas. Si se crea una nueva colección con el mismo nombre que la colección eliminada, las secuencias de cambios se habilitarán para esa colección.
Puede enumerar todas las secuencias de cambios habilitadas del clúster mediante la etapa de canalización de agregación $listChangeStreams
. Todas las etapas de agregación admitidas por Amazon DocumentDB se pueden utilizar en la canalización para un procesamiento adicional. Si se deshabilita una colección que antes estaba habilitada, no aparecerá en la salida de $listChangeStreams
.
//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));
Ejemplo: uso de flujos de cambios con Python
El siguiente es un ejemplo del uso de una secuencia de cambios de Amazon DocumentDB con Python a nivel de la colección.
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """
El siguiente es un ejemplo del uso de una secuencia de cambios de Amazon DocumentDB con Python a nivel de la base de datos.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """
Búsqueda completa de documentos
El evento de cambio de actualización no incluye el documento completo; incluye tan solo el cambio realizado. Si su caso de uso requiere el documento completo afectado por una actualización, puede habilitar la búsqueda completa de documentos al abrir la secuencia.
El documento fullDocument
de un evento de secuencias de cambios de actualización representa la versión más reciente del documento actualizado en el momento de la búsqueda de documentos. Si se han producido cambios entre la operación de actualización y la búsqueda de fullDocument
, es posible que el documento fullDocument
no represente el estado del documento en el momento de la actualización.
Para crear un objeto de flujo con la búsqueda de actualizaciones habilitada, use este ejemplo:
stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()
El resultado del objeto de flujo tendrá un aspecto similar a este:
{'_id': {'_data': '015daf9b7c00000001010000000100009025'},
'clusterTime': Timestamp(1571789692, 1),
'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'update',
'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}
Reanudación de un flujo de cambios
Puede reanudar una secuencia de cambios más tarde mediante un token de reanudación, que es igual al campo _id
del último documento de evento de cambio recuperado.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """
Reanudación de un flujo de cambios con startAtOperationTime
Puede reanudar un flujo de cambios más adelante a partir de una marca de tiempo determinada utilizando startAtOperationTime
.
nota
startAtOperationTime
se puede utilizar en Amazon DocumentDB 4.0 y versiones posteriores. Cuando se utiliza startAtOperationTime
, el cursor del flujo de cambios solo devolverá los cambios que se hayan producido en la marca de tiempo especificada o después de ella. Los comandos startAtOperationTime
y resumeAfter
se excluyen mutuamente y, por lo tanto, no se pueden usar juntos.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """
Transacciones en flujos de cambios
Los eventos de flujo de cambios no contendrán eventos de transacciones sin confirmar ni canceladas. Por ejemplo, si inicia una transacción con una operación INSERT
y una operación UPDATE
. Si la operación INSERT
se realiza correctamente, pero la operación UPDATE
no se realiza correctamente, la transacción se anulará. Como esta transacción se ha revertido, tu flujo de cambios no contendrá ningún evento de esta transacción.
Modificación de la duración de la retención del registro de flujos de cambios
Puede modificar la duración de la retención del registro de secuencias de cambios para que esté comprendida entre 1 hora y 7 días utilizando la AWS Management Console o la AWS CLI.
nota
La retención del registro de flujos de cambios no eliminará los registros anteriores al change_stream_log_retention_duration
valor configurado hasta que el tamaño del registro sea superior a (>) 51.200 MB.
Uso de flujos de cambios en instancias secundarias
Para empezar a utilizar el flujo de cambios en las instancias secundarias, abra el cursor del flujo de cambios con readPreference
como instancia secundaria.
Puede abrir un cursor de flujo de cambios para observar los cambios en una colección específica o en todas las colecciones de un clúster o base de datos. Puede abrir un cursor de flujo de cambios en cualquier instancia de Amazon DocumentDB y recuperar documentos del flujo de cambios tanto de las instancias de escritura como de lectura. Puede compartir los tokens de flujo de cambios (por ejemplo, resumeToken
o startOperationTime
) entre distintos cursores del flujo de cambios abiertos en una instancia de escritura y lectura.
Ejemplo
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)
Directrices y limitaciones para los flujos de cambios en las instancias secundarias
Los eventos del flujo de cambios deben replicarse desde la instancia principal a las instancias secundarias. Puede supervisar el retraso de la métrica
DBInstanceReplicaLag
en Amazon CloudWatch.Es posible que las marcas de tiempo de las instancias secundarias no estén siempre sincronizadas con las de la instancia principal. En este caso, espere retrasos en la marca de tiempo de la instancia secundaria para que pueda ponerse al día. Es recomendable que utilice
startAtOperationTime
oresumeToken
para iniciar el reloj en la instancia secundaria.Es posible que obtenga un rendimiento inferior en las instancias secundarias en comparación con la instancia principal si el tamaño del documento es grande; está haciendo
fullDocumentLookup
y hay una gran carga de trabajo de escritura simultánea en la instancia principal. Como práctica recomendada, le recomendamos que supervise la proporción de aciertos de la caché del búfer en la secundaria y que se asegure de que la tasa de aciertos de la caché del búfer sea alta.