Usar fluxos de alterações com o Amazon DocumentDB - Amazon DocumentDB

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usar fluxos de alterações com o Amazon DocumentDB

O atributo de fluxos de alterações do Amazon DocumentDB (compativel com MongoDB) fornece uma sequência ordenada por tempo das alterações que ocorrem nas coleções do cluster. É possível ler eventos de um fluxo de alterações para implementar muitos casos de uso diferentes, incluindo o seguinte:

  • Notificação de alterações

  • Pesquisa de texto completo com o Amazon OpenSearch Service (OpenSearch Service)

  • Analytics com Amazon Redshift

As aplicações podem usar os fluxos de alterações para assinar as alterações de dados em coleções individuais. Os eventos dos fluxos de alterações são ordenados à medida que ocorrem no cluster e são armazenados por 3 horas (por padrão) após a gravação do evento. O período de retenção pode ser estendido até 7 dias usando o parâmetro change_stream_log_retention_duration. Para modificar o período de retenção do fluxo de alterações, consulte Modificação da duração da retenção do log do fluxo de alterações.

Operações compatíveis

O Amazon DocumentDB oferece suporte às seguintes operações para fluxos de alterações:

  • Todos os eventos de alteração compatíveis na API db.collection.watch(), db.watch() e client.watch() do MongoDB.

  • Pesquisa completa de documentos para atualizações.

  • Estágios de agregação: $match, $project, $redact, $addFields e $replaceRoot.

  • Retomando um fluxo de alterações a partir de um token de currículo

  • Retomar um fluxo de alterações de um carimbo de data/hora usando startAtOperation (aplicável ao Amazon DocumentDB 4.0+)

Faturamento

O atributo de fluxos de alterações do Amazon DocumentDB é desativado por padrão e não incorre em cobranças adicionais até ser ativado e usado. O uso de fluxos de mudança em um cluster gera custos adicionais de leitura, gravação IOs e armazenamento. É possível usar a operação modifyChangeStreams de API para habilitar esse atributo para seu cluster. Para obter mais informações sobre preços, consulte Preços do Amazon DocumentDB.

Limitações

Os fluxos de alterações têm as seguintes limitações no Amazon DocumentDB:

  • No Amazon DocumentDB 3.6 e no Amazon DocumentDB 4.0, os fluxos de alterações só podem ser abertos de uma conexão com a instância primária de um cluster do Amazon DocumentDB. A leitura de fluxos de alteração em uma instância de réplica não é compatível com o Amazon DocumentDB 3.6. e no Amazon DocumentDB 4.0. Ao chamar a operação de API watch(), é necessário especificar uma preferência de leitura primary para garantir que todas as leituras sejam direcionadas à instância principal (consulte a seção Exemplo).

  • No Amazon DocumentDB 5.0, os fluxos de alterações podem ser abertos tanto da instância primária quanto da instância secundária, incluindo clusters globais. Você pode especificar uma preferência de leitura secundária para redirecionar os fluxos de alteração para instâncias secundárias. Consulte Usar fluxos de alterações em instâncias secundárias para saber sobre outras práticas recomendadas e limitações.

  • Os eventos gravados em um fluxo de alterações para uma coleção estão disponíveis por até 7 dias (o padrão é 3 horas). Os dados de fluxos de alterações são excluídos após a janela de duração de retenção de log, mesmo que nenhuma nova alteração tenha ocorrido.

  • Uma operação de gravação de longa duração em uma coleção como updateMany ou deleteMany pode interromper temporariamente a gravação dos eventos dos fluxos de alterações até que ela seja concluída.

  • O Amazon DocumentDB não oferece suporte ao log de operações do MongoDB (oplog).

  • Com o Amazon DocumentDB, é necessário ativar explicitamente os fluxos de alterações em determinada coleção.

  • Se o tamanho total de um evento de fluxos de alterações (incluindo os dados das alterações e o documento completo, se solicitado) for maior do que 16 MB, o cliente sofrerá uma falha de leitura nos fluxos de alterações.

  • Atualmente, o driver Ruby não é aceito ao usar db.watch() e client.watch() com o Amazon DocumentDB 3.6.

  • A saída do comando updateDescription nos fluxos de alterações é diferente no Amazon DocumentDB e no MongoDB quando o valor atualizado do campo é o mesmo do anterior:

    • O Amazon DocumentDB não retornará um campo na saída de updateDescription se o campo fornecido for especificado no comando $set e seu valor de destino já for igual ao valor de origem.

    • O MongoDB retorna o campo na saída, mesmo que o valor especificado seja igual ao valor atual.

Ativar fluxos de alterações

É possível habilitar os fluxos de alterações do Amazon DocumentDB em todas as coleções em um determinado banco de dados ou apenas em coleções específicas. Veja a seguir os exemplos de como habilitar os fluxos de alterações em diferentes casos de uso com o shell do Mongo. As strings vazias são tratadas como curingas na especificação de nomes de banco de dados e coleções.

//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

Os fluxos de alterações serão ativados em uma coleção se qualquer uma destas opções for verdadeira:

  • O banco de dados e a coleção estão explicitamente ativados.

  • O banco de dados que contém a coleção está ativado.

  • Todos os bancos de dados estão ativados.

Eliminar uma coleção de um banco de dados não desativará os fluxos de alterações dessa coleção se o banco de dados pai também tiver fluxos de alterações ativados, ou se todos os bancos de dados do cluster estiverem ativados. Se uma coleção for criada com o mesmo nome da coleção excluída, os fluxos de alterações serão ativados para essa coleção.

É possível listar todos os fluxos de alterações ativados para o cluster usando o estágio de agregação do pipeline $listChangeStreams. Todas as etapas de agregação compatíveis com o Amazon DocumentDB podem ser usadas no pipeline para processamento adicional. Se uma coleção ativada anteriormente tiver sido desativada, ela não aparecerá na saída $listChangeStreams.

//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));

Exemplo: usar fluxos de alterações com Python

Veja a seguir um exemplo do uso de um fluxo de alterações do Amazon DocumentDB com Python no nível da coleção.

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """

Veja a seguir um exemplo do uso de um fluxo de alterações do Amazon DocumentDB com Python no nível do banco de dados.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """

Pesquisa completa de documentos

O evento de alteração de atualização não inclui o documento completo, apenas a alteração que foi feita. Se o seu caso de uso exigir o documento completo afetado por uma atualização, é possível ativar a pesquisa completa do documento na abertura do fluxo.

O documento fullDocument de um evento de fluxos de alterações de atualização representa a versão mais atual do documento atualizado no momento em que ele é pesquisado. Se ocorrerem alterações entre a operação de atualização e a pesquisa do fullDocument, o documento fullDocument poderá não representar o estado dele no momento da atualização.

Para criar um objeto de fluxo com a pesquisa de atualização ativada, use este exemplo:

stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()

A saída do objeto de fluxo será semelhante ao seguinte:

{'_id': {'_data': '015daf9b7c00000001010000000100009025'}, 'clusterTime': Timestamp(1571789692, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

Retomar um fluxo de alterações

É possível retomar um fluxo de alterações posteriormente usando um token de retomada, que é igual ao campo _id do último documento de evento de alteração recuperado.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """

Retomar um fluxo de alterações com startAtOperationTime

É possível retomar um fluxo de alterações posteriormente a partir de um carimbo de data/hora específico usando startAtOperationTime.

nota

A capacidade de usar startAtOperationTime está disponível no Amazon DocumentDB 4.0+. Ao usar startAtOperationTime, o cursor do fluxo de alterações retornará apenas as alterações que ocorreram no carimbo de data/hora especificado ou após ele. Os comandos startAtOperationTime e resumeAfter são mutuamente exclusivos e, portanto, não podem ser usados juntos.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """

Transações em fluxos de alterações

Os eventos de fluxo de alterações não conterão eventos de transações não confirmadas e/ou abortadas. Por exemplo, se você iniciar uma transação com uma INSERT operação e uma UPDATE operação e se sua INSERT operação for bem-sucedida, mas a UPDATE operação falhar, a transação será revertida. Como esta transação foi revertida, seu fluxo de alterações não conterá nenhum evento para esta transação.

Modificar a duração da retenção do log do fluxo de alterações

Você pode modificar a duração da retenção do log do stream de alterações para ser entre 1 hora e 7 dias usando o AWS Management Console ou AWS CLI o.

Using the AWS Management Console
Como modificar a duração da retenção do log do fluxo de alterações
  1. Faça login no e abra AWS Management Console o console do Amazon DocumentDB em https://console.aws.amazon.com /docdb.

  2. No painel de navegação, escolha Grupos de parâmetros.

    dica

    Caso não visualize o painel de navegação à esquerda da tela, selecione o ícone do menu (Hamburger menu icon with three horizontal lines.) no canto superior esquerdo da página.

  3. No painel Grupos de parâmetros, escolha o grupo de parâmetros de cluster associado ao cluster. Para identificar o grupo de parâmetros de cluster associado ao cluster, consulte Determinando o grupo de parâmetros de um cluster do Amazon DocumentDB.

  4. A página resultante mostra os parâmetros e os detalhes correspondentes para seu grupo de parâmetros do cluster. Selecione o parâmetro change_stream_log_retention_duration.

  5. No canto superior direito da página, selecione Editar para alterar o valor do parâmetro. O parâmetro change_stream_log_retention_duration pode ser modificado para ficar entre 1 hora e 7 dias.

  6. Faça a alteração e escolha Modificar parâmetro de cluster para salvar as alterações. Para descartar as alterações, escolha Cancelar.

Using the AWS CLI

Para modificar o parâmetro change_stream_log_retention_duration de um grupo de parâmetros de cluster, use a operação modify-db-cluster-parameter-group com os parâmetros a seguir:

  • --db-cluster-parameter-group-name — Obrigatório. O nome do grupos de parâmetros de cluster que você está modificando. Para identificar o grupo de parâmetros de cluster associado ao cluster, consulte Determinando o grupo de parâmetros de um cluster do Amazon DocumentDB.

  • --parameters — Obrigatório. O parâmetro que você está modificando. Cada entrada de parâmetro deve incluir o seguinte:

    • ParameterName — O nome do parâmetro que você está modificando. Neste caso, é change_stream_log_retention_duration

    • ParameterValue — O novo valor para esse parâmetro.

    • ApplyMethod — como você deseja aplicar as alterações nesse parâmetro. Os valores permitidos são immediate e pending-reboot.

      nota

      Os parâmetros com ApplyType de static devem ter um ApplyMethod de pending-reboot.

  1. Para alterar os valores do parâmetro change_stream_log_retention_duration, execute o seguinte comando e substitua parameter-value pelo valor para o qual deseja modificar o parâmetro.

    Para Linux, macOS ou Unix:

    aws docdb modify-db-cluster-parameter-group \ --db-cluster-parameter-group-name sample-parameter-group \ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    Para Windows:

    aws docdb modify-db-cluster-parameter-group ^ --db-cluster-parameter-group-name sample-parameter-group ^ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    A saída dessa operação é semelhante ao seguinte (formato JSON).

    { "DBClusterParameterGroupName": "sample-parameter-group" }
  2. Aguarde pelo menos 5 minutos.

  3. Liste os valores de parâmetro de sample-parameter-group para garantir que suas alterações foram feitas.

    Para Linux, macOS ou Unix:

    aws docdb describe-db-cluster-parameters \ --db-cluster-parameter-group-name sample-parameter-group

    Para Windows:

    aws docdb describe-db-cluster-parameters ^ --db-cluster-parameter-group-name sample-parameter-group

    A saída dessa operação é semelhante ao seguinte (formato JSON).

    { "Parameters": [ { "ParameterName": "audit_logs", "ParameterValue": "disabled", "Description": "Enables auditing on cluster.", "Source": "system", "ApplyType": "dynamic", "DataType": "string", "AllowedValues": "enabled,disabled", "IsModifiable": true, "ApplyMethod": "pending-reboot" }, { "ParameterName": "change_stream_log_retention_duration", "ParameterValue": "12345", "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.", "Source": "user", "ApplyType": "dynamic", "DataType": "integer", "AllowedValues": "3600-86400", "IsModifiable": true, "ApplyMethod": "immediate" } ] }
nota

A retenção de log de fluxo de alterações não excluirá logs mais antigos que o valor change_stream_log_retention_duration configurado até que o tamanho do log seja maior que (>) 51.200 MB.

Usar fluxos de alterações em instâncias secundárias

Para começar a usar o fluxo de alterações em instâncias secundárias, abra o cursor do fluxo de alterações com readPreference como secundário.

Você pode abrir um cursor de fluxo de alterações para observar os eventos de alteração em uma coleção específica ou em todas as coleções em um cluster ou banco de dados. Você pode abrir um cursor do fluxo de alterações em qualquer instância do Amazon DocumentDB e buscar documentos do fluxo de alterações das instâncias do escritor e do leitor. Você pode compartilhar tokens de fluxo de alterações (como resumeToken ou startOperationTime) em diferentes cursores de fluxo de alterações abertos em uma instância de gravador e leitor.

Exemplo

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)

Diretrizes e limitações para fluxos de alterações em instâncias secundárias

  • Os eventos do fluxo de alterações precisam ser replicados da instância primária para as instâncias secundárias. Você pode monitorar o atraso a partir da DBInstanceReplicaLag métrica na Amazon CloudWatch.

  • Os carimbos de data e hora em instâncias secundárias nem sempre estão sincronizados com a instância primária. Nesse caso, espere atrasos para que o carimbo de data e hora da instância secundária possa se atualizar. Como prática recomendada, recomendamos usar startAtOperationTime ou resumeToken iniciar o relógio na instância secundária.

  • Você poderá ter um throughput menor em instâncias secundárias em comparação com a instância primária se o tamanho do documento for grande e você estiver fazendo fullDocumentLookup, e se houver um alto workload de gravação simultânea na instância primária. Como prática recomendada, recomendamos que você monitore a taxa de acertos do cache do buffer na secundária e certifique-se de que a taxa de acertos do cache do buffer seja alta.