将变更流与 Amazon DocumentDB 结合使用 - Amazon DocumentDB

将变更流与 Amazon DocumentDB 结合使用

Amazon DocumentDB(与 MongoDB 兼容)中的变更流功能提供按时间顺序排列的更改事件,这些事件在您的集群集合内发生。您可以从变更流中读取事件,以实现许多不同的使用案例,包括以下情况:

  • 更改通知

  • 使用 Amazon OpenSearch Service(OpenSearch 服务)进行全文搜索

  • 使用 Amazon Redshift 分析

应用程序可以使用变更流在各个集合中订阅数据变更。变更流事件在集群上发生时按顺序排列,并在记录事件之后存储 3 个小时(默认情况下)。使用 change_stream_log_retention_duration 参数可以将保留期延长至 7 天。要修改变更流保留期,请参阅修改变更流日志保留期限

支持的操作

Amazon DocumentDB 支持以下变更流操作:

  • MongoDB db.collection.watch()db.watch()client.watch() API 中支持的所有变更事件。

  • 查找完整文档以获取更新。

  • 聚合阶段:$match$project$redact$addFields$replaceRoot

  • 从简历令牌恢复变更流

  • 使用 startAtOperation 从时间戳恢复变更流(适用于 Amazon DocumentDB 4.0+)

Billing

默认情况下,Amazon DocumentDB 变更流功能处于禁用状态,并且在启用该功能之前不会产生任何额外费用。在集群中使用变更流时,会产生额外的读取和写入 IOs 和存储成本。可以使用 modifyChangeStreams API 为集群启用此功能。有关定价的更多信息,请参阅 Amazon DocumentDB 定价

限制

变更流在 Amazon DocumentDB 中存在以下限制:

  • 在 Amazon DocumentDB 3.6 和 Amazon DocumentDB 4.0 上,变更流只能通过与 Amazon DocumentDB 集群主实例的连接打开。Amazon DocumentDB 3.6 和 Amazon DocumentDB 4.0 不支持从副本实例上的变更流中读取信息。在调用 watch() API 操作时,您必须指定 primary 读取首选项,以确保所有读取都定向到主实例(请参阅示例部分)。

  • 在 Amazon DocumentDB 5.0 上,可以从主实例和辅助实例(包括全局集群)打开变更流。可以指定辅助读取首选项,将变更流重定向至辅助实例。有关其他最佳实践和限制,请参阅 在辅助实例上使用变更流

  • 写入集合的变更流的事件最多可在 7 天(默认为 3 小时)内使用。变更流数据将在日志保留时段过后删除,即使没有发生新更改也是如此。

  • updateManydeleteMany 之类的集合执行长时间运行的写入操作时,会暂时延迟变更流事件的写入,直至长时间运行的写入操作完成为止。

  • Amazon DocumentDB 不支持 MongoDB 操作日志 (oplog)。

  • 使用 Amazon DocumentDB,您必须明确在给定集合上启用变更流。

  • 如果变更流事件的总大小(包括变更数据,在请求的情况下还包括完整文档)大于 16 MB,客户端将在变更流上遇到读取失败情况。

  • 当使用 db.watch()client.watch() 搭配 Amazon DocumentDB 3.6 时,目前不支持 Ruby 驱动程序。

  • 当该字段的更新值与之前的值相同时,在 Amazon DocumentDB 中,updateDescription 命令在变更流中的输出与 MongoDB 中不同:

    • 如果 $set 命令指定了所提供的字段,并且其目标值已经等于源值,则 Amazon DocumentDB 不会在 updateDescription 输出中返回字段。

    • 即使指定值等于当前值,MongoDB 也会在输出中返回此字段。

启用变更流

您可以为给定数据库中的所有集合启用 Amazon DocumentDB 变更流,也可以只针对选定集合启用。下面是如何使用 mongo shell 为不同使用案例启用变更流的示例。在指定数据库和集合名称时,将空字符串视为通配符。

//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

如果满足以下任意条件,则将为集合启用变更流:

  • 数据库和集合均已明确启用。

  • 包含该集合的数据库已启用。

  • 所有数据库均已启用。

如果父数据库也启用了变更流,或者集群中的所有数据库都已启用,则从数据库中删除集合不会禁用该集合的变更流。如果创建了与已删除收藏同名的新收藏集,则将为该集合启用变更流。

您可以使用 $listChangeStreams 聚合管道阶段列出集群所有已启用的变更流。Amazon DocumentDB 支持的所有聚合阶段都可以在管道中用于额外处理。如果以前启用的某个集合被禁用,则该集合将不会显示在 $listChangeStreams 输出中。

//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));

示例:在 Python 中使用变更流

以下是在集合级别使用带有 Python 的 Amazon DocumentDB 变更流的示例。

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """

以下是在数据库级别使用带有 Python 的 Amazon DocumentDB 变更流的示例。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """

完整文档查找

更新变更事件不包括完整文档;只包括已执行的变更。如果您的使用案例需要用到受更新影响的完整文档,则可以在打开流时启用完整文档查找。

更新变更流事件的 fullDocument 文档会指明文档查找时已更新文档的最新版本。如果在更新操作与 fullDocument 查找之间发生了变更,则 fullDocument 文档可能无法指明更新时的文档状态。

要创建启用更新查找的流对象,请使用以下示例:

stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()

流对象输出类似如下:

{'_id': {'_data': '015daf9b7c00000001010000000100009025'}, 'clusterTime': Timestamp(1571789692, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

恢复变更流

您可以在以后通过使用恢复令牌来恢复变更流,该令牌相当于上次检索的变更事件文档的 _id 字段。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """

使用 startAtOperationTime 恢复变更流

您可以稍后使用 startAtOperationTime 从特定时间戳恢复变更流。

注意

Amazon DocumentDB 4.0+ 提供了使用 startAtOperationTime 的功能。使用 startAtOperationTime 时,变更流光标将仅返回在指定时间戳或之后发生的更改。startAtOperationTimeresumeAfter 命令是互斥的,因此不能一起使用。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """

变更流中的事务

变更流事件将不包含来自未提交和/或已中止事务事件。例如,如果您以一个 INSERT 操作和一个 UPDATE 操作启动事务,然后。如果您的 INSERT 操作成功但 UPDATE 操作失败,则事务将回滚。由于此事务已回滚,您的变更流将不包含此事务的任何事件。

修改变更流日志保留期限

可以使用 AWS Management Console 或 AWS CLI 将变更流日志的保留时间修改为 1 小时到 7 天之间的任何时间。

Using the AWS Management Console
修改变更流日志保留期限的步骤
  1. 登录到 AWS Management Console 并打开 Amazon DocumentDB 控制台,网址:https://console.aws.amazon.com/docdb

  2. 在导航窗格中,选择参数组

    提示

    如果您在屏幕左侧没有看到导航窗格,请在页面左上角选择菜单图标 (Hamburger menu icon with three horizontal lines.)。

  3. 参数组内,选择与集群相关联的集群参数组。要识别与集群关联的集群参数组,请参阅 确定 Amazon DocumentDB 集群的参数组

  4. 所得页面显示您的集群参数组的参数及它们的相应详情。选择 change_stream_log_retention_duration 参数。

  5. 在页面右上角,选择编辑以更改参数的值。可以将 change_stream_log_retention_duration 参数修改为 1 小时到 7 天之间。

  6. 进行更改,然后选择修改集群参数以保存更改。要放弃更改,请选择取消

Using the AWS CLI

要修改集群参数组的 change_stream_log_retention_duration 参数,请使用带以下参数的 modify-db-cluster-parameter-group 操作:

  • --db-cluster-parameter-group-name – 必需。您正在修改的集群参数组的名称。要识别与集群关联的集群参数组,请参阅 确定 Amazon DocumentDB 集群的参数组

  • --parameters – 必需。您正在修改的参数的名称。每个参数条目必须包含以下内容:

    • ParameterName — 您正在修改的参数的名称。在本例中,为 change_stream_log_retention_duration

    • ParameterValue — 此参数的新值。

    • ApplyMethod — 您希望如何应用对此参数的更改。允许的值为 immediatepending-reboot

      注意

      staticApplyType 参数必须具有 pending-rebootApplyMethod

  1. 要更改参数 change_stream_log_retention_duration 的值,请运行以下命令并替换 parameter-value 为要修改参数的值。

    对于 Linux、macOS 或 Unix:

    aws docdb modify-db-cluster-parameter-group \ --db-cluster-parameter-group-name sample-parameter-group \ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    对于 Windows:

    aws docdb modify-db-cluster-parameter-group ^ --db-cluster-parameter-group-name sample-parameter-group ^ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    此操作的输出将类似于下文(JSON 格式)。

    { "DBClusterParameterGroupName": "sample-parameter-group" }
  2. 至少等待 5 分钟。

  3. 列出 sample-parameter-group 参数值以确保已进行更改。

    对于 Linux、macOS 或 Unix:

    aws docdb describe-db-cluster-parameters \ --db-cluster-parameter-group-name sample-parameter-group

    对于 Windows:

    aws docdb describe-db-cluster-parameters ^ --db-cluster-parameter-group-name sample-parameter-group

    此操作的输出将类似于下文(JSON 格式)。

    { "Parameters": [ { "ParameterName": "audit_logs", "ParameterValue": "disabled", "Description": "Enables auditing on cluster.", "Source": "system", "ApplyType": "dynamic", "DataType": "string", "AllowedValues": "enabled,disabled", "IsModifiable": true, "ApplyMethod": "pending-reboot" }, { "ParameterName": "change_stream_log_retention_duration", "ParameterValue": "12345", "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.", "Source": "user", "ApplyType": "dynamic", "DataType": "integer", "AllowedValues": "3600-86400", "IsModifiable": true, "ApplyMethod": "immediate" } ] }
注意

在日志大小大于 (>) 51,200MB 之前,变更流日志保留期不会删除早于配置 change_stream_log_retention_duration 值的日志。

在辅助实例上使用变更流

要开始在辅助实例上使用变更流,请以 readPreference 作为辅助实例打开变更流指针。

可以打开变更流指针来监视集群或数据库中特定集合或所有集合上的变更事件。您可以在任何 Amazon DocumentDB 实例上打开变更流指针,并从写入器和读取器实例获取变更流文档。您可以在写入器和读取器实例上打开的不同变更流指针之间共享变更流令牌(例如 resumeTokenstartOperationTime)。

示例

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)

辅助实例上的变更流准则和限制

  • 变更流事件需要从主实例复制到辅助实例。您可以在 Amazon CloudWatch 中监控 DBInstanceReplicaLag 指标的滞后性。

  • 辅助实例上的时间戳可能并非一直与主实例同步。在这种情况下,预计辅助实例的时间戳会产生延迟,这样它才能赶得上。作为最佳实践,我们建议使用 startAtOperationTimeresumeToken 在辅助实例上启动监视。

  • 如果文档太大,您在进行 fullDocumentLookup,而主实例上的并发写入工作负载很高,那么辅助实例上的吞吐量可能低于主实例。作为最佳实践,我们建议在辅助实例上监控缓冲区的缓存命中率,并确保缓冲区的缓存命中率很高。