As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Usar fluxos de alterações com o Amazon DocumentDB
O atributo de fluxos de alterações do Amazon DocumentDB (compativel com MongoDB) fornece uma sequência ordenada por tempo das alterações que ocorrem nas coleções do cluster. É possível ler eventos de um fluxo de alterações para implementar muitos casos de uso diferentes, incluindo o seguinte:
-
Notificação de alterações
-
Pesquisa de texto completo com o Amazon OpenSearch Service (OpenSearch Service)
-
Analytics com Amazon Redshift
As aplicações podem usar os fluxos de alterações para assinar as alterações de dados em coleções individuais. Os eventos dos fluxos de alterações são ordenados à medida que ocorrem no cluster e são armazenados por 3 horas (por padrão) após a gravação do evento. O período de retenção pode ser estendido até 7 dias usando o parâmetro change_stream_log_retention_duration
. Para modificar o período de retenção do fluxo de alterações, consulte Modificação da duração da retenção do log do fluxo de alterações.
Tópicos
- Operações compatíveis
- Faturamento
- Limitações
- Ativar fluxos de alterações
- Exemplo: usar fluxos de alterações com Python
- Pesquisa completa de documentos
- Retomar um fluxo de alterações
- Retomar um fluxo de alterações com startAtOperationTime
- Transações em fluxos de alterações
- Modificar a duração da retenção do log do fluxo de alterações
- Usar fluxos de alterações em instâncias secundárias
Operações compatíveis
O Amazon DocumentDB oferece suporte às seguintes operações para fluxos de alterações:
-
Todos os eventos de alteração compatíveis na API
db.collection.watch()
,db.watch()
eclient.watch()
do MongoDB. -
Pesquisa completa de documentos para atualizações.
-
Estágios de agregação:
$match
,$project
,$redact
,$addFields
e$replaceRoot
. -
Retomando um fluxo de alterações a partir de um token de currículo
-
Retomar um fluxo de alterações de um carimbo de data/hora usando
startAtOperation
(aplicável ao Amazon DocumentDB 4.0+)
Faturamento
O atributo de fluxos de alterações do Amazon DocumentDB é desativado por padrão e não incorre em cobranças adicionais até ser ativado e usado. O uso de fluxos de mudança em um cluster gera custos adicionais de leitura, gravação IOs e armazenamento. É possível usar a operação modifyChangeStreams
de API para habilitar esse atributo para seu cluster. Para obter mais informações sobre preços, consulte Preços do Amazon DocumentDB
Limitações
Os fluxos de alterações têm as seguintes limitações no Amazon DocumentDB:
-
No Amazon DocumentDB 3.6 e no Amazon DocumentDB 4.0, os fluxos de alterações só podem ser abertos de uma conexão com a instância primária de um cluster do Amazon DocumentDB. A leitura de fluxos de alteração em uma instância de réplica não é compatível com o Amazon DocumentDB 3.6. e no Amazon DocumentDB 4.0. Ao chamar a operação de API
watch()
, é necessário especificar uma preferência de leituraprimary
para garantir que todas as leituras sejam direcionadas à instância principal (consulte a seção Exemplo). -
No Amazon DocumentDB 5.0, os fluxos de alterações podem ser abertos tanto da instância primária quanto da instância secundária, incluindo clusters globais. Você pode especificar uma preferência de leitura secundária para redirecionar os fluxos de alteração para instâncias secundárias. Consulte Usar fluxos de alterações em instâncias secundárias para saber sobre outras práticas recomendadas e limitações.
-
Os eventos gravados em um fluxo de alterações para uma coleção estão disponíveis por até 7 dias (o padrão é 3 horas). Os dados de fluxos de alterações são excluídos após a janela de duração de retenção de log, mesmo que nenhuma nova alteração tenha ocorrido.
-
Uma operação de gravação de longa duração em uma coleção como
updateMany
oudeleteMany
pode interromper temporariamente a gravação dos eventos dos fluxos de alterações até que ela seja concluída. -
O Amazon DocumentDB não oferece suporte ao log de operações do MongoDB (
oplog
). -
Com o Amazon DocumentDB, é necessário ativar explicitamente os fluxos de alterações em determinada coleção.
-
Se o tamanho total de um evento de fluxos de alterações (incluindo os dados das alterações e o documento completo, se solicitado) for maior do que
16 MB
, o cliente sofrerá uma falha de leitura nos fluxos de alterações. -
Atualmente, o driver Ruby não é aceito ao usar
db.watch()
eclient.watch()
com o Amazon DocumentDB 3.6. -
A saída do comando
updateDescription
nos fluxos de alterações é diferente no Amazon DocumentDB e no MongoDB quando o valor atualizado do campo é o mesmo do anterior:O Amazon DocumentDB não retornará um campo na saída de
updateDescription
se o campo fornecido for especificado no comando$set
e seu valor de destino já for igual ao valor de origem.O MongoDB retorna o campo na saída, mesmo que o valor especificado seja igual ao valor atual.
Ativar fluxos de alterações
É possível habilitar os fluxos de alterações do Amazon DocumentDB em todas as coleções em um determinado banco de dados ou apenas em coleções específicas. Veja a seguir os exemplos de como habilitar os fluxos de alterações em diferentes casos de uso com o shell do Mongo. As strings vazias são tratadas como curingas na especificação de nomes de banco de dados e coleções.
//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});
Os fluxos de alterações serão ativados em uma coleção se qualquer uma destas opções for verdadeira:
-
O banco de dados e a coleção estão explicitamente ativados.
-
O banco de dados que contém a coleção está ativado.
-
Todos os bancos de dados estão ativados.
Eliminar uma coleção de um banco de dados não desativará os fluxos de alterações dessa coleção se o banco de dados pai também tiver fluxos de alterações ativados, ou se todos os bancos de dados do cluster estiverem ativados. Se uma coleção for criada com o mesmo nome da coleção excluída, os fluxos de alterações serão ativados para essa coleção.
É possível listar todos os fluxos de alterações ativados para o cluster usando o estágio de agregação do pipeline $listChangeStreams
. Todas as etapas de agregação compatíveis com o Amazon DocumentDB podem ser usadas no pipeline para processamento adicional. Se uma coleção ativada anteriormente tiver sido desativada, ela não aparecerá na saída $listChangeStreams
.
//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));
Exemplo: usar fluxos de alterações com Python
Veja a seguir um exemplo do uso de um fluxo de alterações do Amazon DocumentDB com Python no nível da coleção.
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """
Veja a seguir um exemplo do uso de um fluxo de alterações do Amazon DocumentDB com Python no nível do banco de dados.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """
Pesquisa completa de documentos
O evento de alteração de atualização não inclui o documento completo, apenas a alteração que foi feita. Se o seu caso de uso exigir o documento completo afetado por uma atualização, é possível ativar a pesquisa completa do documento na abertura do fluxo.
O documento fullDocument
de um evento de fluxos de alterações de atualização representa a versão mais atual do documento atualizado no momento em que ele é pesquisado. Se ocorrerem alterações entre a operação de atualização e a pesquisa do fullDocument
, o documento fullDocument
poderá não representar o estado dele no momento da atualização.
Para criar um objeto de fluxo com a pesquisa de atualização ativada, use este exemplo:
stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()
A saída do objeto de fluxo será semelhante ao seguinte:
{'_id': {'_data': '015daf9b7c00000001010000000100009025'},
'clusterTime': Timestamp(1571789692, 1),
'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'update',
'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}
Retomar um fluxo de alterações
É possível retomar um fluxo de alterações posteriormente usando um token de retomada, que é igual ao campo _id
do último documento de evento de alteração recuperado.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """
Retomar um fluxo de alterações com startAtOperationTime
É possível retomar um fluxo de alterações posteriormente a partir de um carimbo de data/hora específico usando startAtOperationTime
.
nota
A capacidade de usar startAtOperationTime
está disponível no Amazon DocumentDB 4.0+. Ao usar startAtOperationTime
, o cursor do fluxo de alterações retornará apenas as alterações que ocorreram no carimbo de data/hora especificado ou após ele. Os comandos startAtOperationTime
e resumeAfter
são mutuamente exclusivos e, portanto, não podem ser usados juntos.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """
Transações em fluxos de alterações
Os eventos de fluxo de alterações não conterão eventos de transações não confirmadas e/ou abortadas. Por exemplo, se você iniciar uma transação com uma INSERT
operação e uma UPDATE
operação e se sua INSERT
operação for bem-sucedida, mas a UPDATE
operação falhar, a transação será revertida. Como esta transação foi revertida, seu fluxo de alterações não conterá nenhum evento para esta transação.
Modificar a duração da retenção do log do fluxo de alterações
Você pode modificar a duração da retenção do log do stream de alterações para ser entre 1 hora e 7 dias usando o AWS Management Console ou AWS CLI o.
nota
A retenção de log de fluxo de alterações não excluirá logs mais antigos que o valor change_stream_log_retention_duration
configurado até que o tamanho do log seja maior que (>) 51.200 MB.
Usar fluxos de alterações em instâncias secundárias
Para começar a usar o fluxo de alterações em instâncias secundárias, abra o cursor do fluxo de alterações com readPreference
como secundário.
Você pode abrir um cursor de fluxo de alterações para observar os eventos de alteração em uma coleção específica ou em todas as coleções em um cluster ou banco de dados. Você pode abrir um cursor do fluxo de alterações em qualquer instância do Amazon DocumentDB e buscar documentos do fluxo de alterações das instâncias do escritor e do leitor. Você pode compartilhar tokens de fluxo de alterações (como resumeToken
ou startOperationTime
) em diferentes cursores de fluxo de alterações abertos em uma instância de gravador e leitor.
Exemplo
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)
Diretrizes e limitações para fluxos de alterações em instâncias secundárias
Os eventos do fluxo de alterações precisam ser replicados da instância primária para as instâncias secundárias. Você pode monitorar o atraso a partir da
DBInstanceReplicaLag
métrica na Amazon CloudWatch.Os carimbos de data e hora em instâncias secundárias nem sempre estão sincronizados com a instância primária. Nesse caso, espere atrasos para que o carimbo de data e hora da instância secundária possa se atualizar. Como prática recomendada, recomendamos usar
startAtOperationTime
ouresumeToken
iniciar o relógio na instância secundária.Você poderá ter um throughput menor em instâncias secundárias em comparação com a instância primária se o tamanho do documento for grande e você estiver fazendo
fullDocumentLookup
, e se houver um alto workload de gravação simultânea na instância primária. Como prática recomendada, recomendamos que você monitore a taxa de acertos do cache do buffer na secundária e certifique-se de que a taxa de acertos do cache do buffer seja alta.