本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
搭配 Amazon DocumentDB 使用變更串流
Amazon DocumentDB 中的變更串流功能 (具有 MongoDB 相容性) 提供叢集集合中發生的按時間順序排列的變更事件序列。您可以從變更串流讀取事件,以實作各種不同的使用案例,包括下列各項:
-
變更通知
-
使用 Amazon OpenSearch Service (OpenSearch Service) 進行全文搜尋
-
使用 Amazon Redshift 進行分析
應用程式可以使用變更串流來訂閱個別集合上的資料變更。變更串流事件發生時便會排序這些事件,並在記錄事件後儲存這些事件 3 小時 (預設)。使用 change_stream_log_retention_duration
參數,保留期最多可延長 7 天。若要修改變更串流保留期間,請參閱修改變更串流日誌保留期間 。
主題
受支援的操作
Amazon DocumentDB 支援變更串流的下列操作:
-
MongoDB
db.collection.watch()
、db.watch()
和client.watch()
支援的所有變更事件API。 -
更新的完整文件查閱。
-
彙總階段:
$match
、$redact
、$project
和$addFields
$replaceRoot
。 -
從繼續權杖恢復變更串流
-
使用 從時間戳記繼續變更串流
startAtOperation
(適用於 Amazon DocumentDB 4.0+)
帳單
Amazon DocumentDB 變更串流功能預設為停用,在啟用此功能之前不會產生任何額外費用。在叢集中使用變更串流會產生額外的讀取、寫入IOs和儲存成本。您可以使用 modifyChangeStreams
API操作為您的叢集啟用此功能。如需定價的詳細資訊,請參閱 Amazon DocumentDB 定價
限制
變更串流在 Amazon DocumentDB 中具有下列限制:
-
在 Amazon DocumentDB 3.6. 和 Amazon DocumentDB 4.0 上,變更串流只能從 Amazon DocumentDB 叢集的主要執行個體的連線開啟。Amazon DocumentDB 3.6. 和 Amazon DocumentDB 4.0 不支援從複本執行個體上的變更串流讀取。叫用
watch()
API 操作時,您必須指定primary
讀取偏好設定,以確保所有讀取都導向主要執行個體 (請參閱範例區段)。 -
在 Amazon DocumentDB 5.0 上,可以從主要執行個體和次要執行個體開啟變更串流,包括全域叢集。您可以指定次要讀取偏好設定,將變更串流重新導向至次要執行個體。如需其他最佳實務和限制在次要執行個體上使用變更串流,請參閱 。
-
寫入集合變更串流的事件最長可使用 7 天 (預設值為 3 小時)。在日誌保留期間時段後將會刪除變更串流資料,即便未發生任何新變更。
-
在如
updateMany
或deleteMany
等集合上長時間執行寫入操作時,會暫停寫入變更串流事件,直到長時間執行的寫入操作完成為止。 -
Amazon DocumentDB 不支援 MongoDB 操作日誌 (
oplog
)。 -
使用 Amazon DocumentDB 時,您必須在指定的集合上明確啟用變更串流。
-
如果變更串流事件的總大小 (如有要求,包括變更資料與完整文件) 大於
16 MB
,用戶端將會遇到變更串流讀取失敗的情況。 -
client.watch()
搭配 Amazon DocumentDB 3.6 使用db.watch()
和 時,目前不支援 Ruby 驅動程式。 -
當欄位的更新值與上一個值相同時,Amazon DocumentDB 中的變更串流
updateDescription
命令輸出與 MongoDB 中的輸出不同:如果在
$set
命令中指定了提供的欄位,且其目標值已等於來源值,Amazon DocumentDB 不會傳回updateDescription
輸出中的欄位。MongoDB 會傳回輸出中的欄位,即使指定的值等於目前的值。
啟用變更串流
您可以為指定資料庫中的所有集合啟用 Amazon DocumentDB 變更串流,或僅為選取的集合啟用。以下是針對使用 mongo 殼層的不同使用案例,如何啟用變更串流的範例。指定資料庫和集合名稱時,空的字串會被視為萬用字元。
//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});
如果下列任一項成立,集合將會啟用變更串流:
-
明確地啟用資料庫和集合。
-
已啟用包含集合的資料庫。
-
已啟用所有資料庫。
如果父資料庫也啟用變更串流或叢集內的所有資料庫都已啟用,捨棄資料庫的集合並不會停用該集合的變更串流。如果新的集合以遭刪除集合的相同名稱建立,該集合將會啟用變更串流。
您可以使用 $listChangeStreams
彙總管線階段列出叢集所有已啟用的變更串流。Amazon DocumentDB 支援的所有彙總階段都可以在管道中使用,以進行其他處理。如果已停用之前啟用的集合,該集合將不會在 $listChangeStreams
輸出中出現。
//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));
範例:搭配 Python 使用變更串流
以下是在集合層級搭配 Python 使用 Amazon DocumentDB 變更串流的範例。
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """
以下是在資料庫層級使用 Amazon DocumentDB 變更串流搭配 Python 的範例。
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """
完整文件查詢
更新變更事件不包括完整文件;只包含所做的變更。如果您的使用案例需要受更新影響的完整文件,您可以在開啟串流時啟用完整文件查閱。
更新變更串流事件的 fullDocument
文件代表在文件查閱時已更新文件最新的版本。如果更新操作與 fullDocument
查閱之間發生變更,fullDocument
文件可能不會代表更新時的文件狀態。
若要在啟用更新查詢的情況下建立串流物件,請使用此範例:
stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()
串流物件的輸出會如下所示:
{'_id': {'_data': '015daf9b7c00000001010000000100009025'},
'clusterTime': Timestamp(1571789692, 1),
'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'update',
'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}
恢復變更串流
您稍後可以使用繼續字符 (等於上次擷取之變更事件文件的 _id
欄位) 繼續變更串流。
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """
使用 繼續變更串流 startAtOperationTime
您可以使用 ,稍後從特定時間戳記繼續變更串流startAtOperationTime
。
注意
Amazon DocumentDB 4.0+ startAtOperationTime
提供使用 的功能。使用 時startAtOperationTime
,變更串流游標只會傳回在指定時間戳記或之後發生的變更。startAtOperationTime
和 resumeAfter
命令是互斥的,因此無法同時使用。
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """
變更串流中的交易
變更串流事件不會包含來自未認可和/或中止交易的事件。例如,如果您使用一個INSERT
操作和一個UPDATE
操作以及 啟動交易。如果您的INSERT
操作成功,但UPDATE
操作失敗,交易將復原。由於此交易已復原,您的變更串流將不會包含此交易的任何事件。
修改變更串流日誌保留期間
您可以使用 AWS Management Console 或 將變更串流日誌保留期間修改為 1 小時至 7 天 AWS CLI。
注意
在日誌大小大於 (>) 51,200MB 之前,變更串流日誌保留不會刪除早於設定change_stream_log_retention_duration
值的日誌。
在次要執行個體上使用變更串流
若要開始使用次要執行個體上的變更串流,請使用 readPreference
作為次要執行個體開啟變更串流游標。
您可以開啟變更串流游標,以監控特定集合或叢集或資料庫中所有集合的變更事件。您可以在任何 Amazon DocumentDB 執行個體上開啟變更串流游標,並從寫入器和讀取器執行個體擷取變更串流文件。您可以在寫入器和讀取器執行個體上開啟的不同變更串流游標之間共用變更串流權杖 (例如 resumeToken
或 startOperationTime
)。
範例
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)
次要執行個體上變更串流的準則和限制
變更串流事件需要從主要執行個體複寫至次要執行個體。您可以從 Amazon 中的
DBInstanceReplicaLag
指標監控延遲 CloudWatch。次要執行個體上的時間戳記不一定會與主要執行個體同步。在此情況下, 預期次要執行個體時間戳記會延遲,以便趕上進度。最佳實務是,建議您使用
startAtOperationTime
或resumeToken
在次要執行個體上啟動手錶。如果文件大小較大、正在執行
fullDocumentLookup
,且主要執行個體具有高並行寫入工作負載,則次要執行個體的輸送量可能會比主要執行個體低。最佳實務是,建議您監控次要 上的緩衝區快取命中率,並確保緩衝區快取命中率很高。