搭配 Amazon DocumentDB 使用變更串流 - Amazon DocumentDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配 Amazon DocumentDB 使用變更串流

Amazon DocumentDB 中的變更串流功能 (具有 MongoDB 相容性) 提供叢集集合中發生的按時間順序排列的變更事件序列。您可以從變更串流讀取事件,以實作各種不同的使用案例,包括下列各項:

  • 變更通知

  • 使用 Amazon OpenSearch Service (OpenSearch Service) 進行全文搜尋

  • 使用 Amazon Redshift 進行分析

應用程式可以使用變更串流來訂閱個別集合上的資料變更。變更串流事件發生時便會排序這些事件,並在記錄事件後儲存這些事件 3 小時 (預設)。使用 change_stream_log_retention_duration 參數,保留期最多可延長 7 天。若要修改變更串流保留期間,請參閱修改變更串流日誌保留期間

受支援的操作

Amazon DocumentDB 支援變更串流的下列操作:

  • MongoDB db.collection.watch()db.watch()client.watch() 支援的所有變更事件API。

  • 更新的完整文件查閱。

  • 彙總階段:$match$redact$project$addFields$replaceRoot

  • 從繼續權杖恢復變更串流

  • 使用 從時間戳記繼續變更串流 startAtOperation(適用於 Amazon DocumentDB 4.0+)

帳單

Amazon DocumentDB 變更串流功能預設為停用,在啟用此功能之前不會產生任何額外費用。在叢集中使用變更串流會產生額外的讀取、寫入IOs和儲存成本。您可以使用 modifyChangeStreamsAPI操作為您的叢集啟用此功能。如需定價的詳細資訊,請參閱 Amazon DocumentDB 定價

限制

變更串流在 Amazon DocumentDB 中具有下列限制:

  • 在 Amazon DocumentDB 3.6. 和 Amazon DocumentDB 4.0 上,變更串流只能從 Amazon DocumentDB 叢集的主要執行個體的連線開啟。Amazon DocumentDB 3.6. 和 Amazon DocumentDB 4.0 不支援從複本執行個體上的變更串流讀取。叫用 watch() API 操作時,您必須指定primary讀取偏好設定,以確保所有讀取都導向主要執行個體 (請參閱範例區段)。

  • 在 Amazon DocumentDB 5.0 上,可以從主要執行個體和次要執行個體開啟變更串流,包括全域叢集。您可以指定次要讀取偏好設定,將變更串流重新導向至次要執行個體。如需其他最佳實務和限制在次要執行個體上使用變更串流,請參閱 。

  • 寫入集合變更串流的事件最長可使用 7 天 (預設值為 3 小時)。在日誌保留期間時段後將會刪除變更串流資料,即便未發生任何新變更。

  • 在如 updateManydeleteMany 等集合上長時間執行寫入操作時,會暫停寫入變更串流事件,直到長時間執行的寫入操作完成為止。

  • Amazon DocumentDB 不支援 MongoDB 操作日誌 (oplog)。

  • 使用 Amazon DocumentDB 時,您必須在指定的集合上明確啟用變更串流。

  • 如果變更串流事件的總大小 (如有要求,包括變更資料與完整文件) 大於 16 MB,用戶端將會遇到變更串流讀取失敗的情況。

  • client.watch() 搭配 Amazon DocumentDB 3.6 使用 db.watch()和 時,目前不支援 Ruby 驅動程式。

  • 當欄位的更新值與上一個值相同時,Amazon DocumentDB 中的變更串流updateDescription命令輸出與 MongoDB 中的輸出不同:

    • 如果在$set命令中指定了提供的欄位,且其目標值已等於來源值,Amazon DocumentDB 不會傳回updateDescription輸出中的欄位。

    • MongoDB 會傳回輸出中的欄位,即使指定的值等於目前的值。

啟用變更串流

您可以為指定資料庫中的所有集合啟用 Amazon DocumentDB 變更串流,或僅為選取的集合啟用。以下是針對使用 mongo 殼層的不同使用案例,如何啟用變更串流的範例。指定資料庫和集合名稱時,空的字串會被視為萬用字元。

//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

如果下列任一項成立,集合將會啟用變更串流:

  • 明確地啟用資料庫和集合。

  • 已啟用包含集合的資料庫。

  • 已啟用所有資料庫。

如果父資料庫也啟用變更串流或叢集內的所有資料庫都已啟用,捨棄資料庫的集合並不會停用該集合的變更串流。如果新的集合以遭刪除集合的相同名稱建立,該集合將會啟用變更串流。

您可以使用 $listChangeStreams 彙總管線階段列出叢集所有已啟用的變更串流。Amazon DocumentDB 支援的所有彙總階段都可以在管道中使用,以進行其他處理。如果已停用之前啟用的集合,該集合將不會在 $listChangeStreams 輸出中出現。

//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));

範例:搭配 Python 使用變更串流

以下是在集合層級搭配 Python 使用 Amazon DocumentDB 變更串流的範例。

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """

以下是在資料庫層級使用 Amazon DocumentDB 變更串流搭配 Python 的範例。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """

完整文件查詢

更新變更事件不包括完整文件;只包含所做的變更。如果您的使用案例需要受更新影響的完整文件,您可以在開啟串流時啟用完整文件查閱。

更新變更串流事件的 fullDocument 文件代表在文件查閱時已更新文件最新的版本。如果更新操作與 fullDocument 查閱之間發生變更,fullDocument 文件可能不會代表更新時的文件狀態。

若要在啟用更新查詢的情況下建立串流物件,請使用此範例:

stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()

串流物件的輸出會如下所示:

{'_id': {'_data': '015daf9b7c00000001010000000100009025'}, 'clusterTime': Timestamp(1571789692, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

恢復變更串流

您稍後可以使用繼續字符 (等於上次擷取之變更事件文件的 _id 欄位) 繼續變更串流。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """

使用 繼續變更串流 startAtOperationTime

您可以使用 ,稍後從特定時間戳記繼續變更串流startAtOperationTime

注意

Amazon DocumentDB 4.0+ startAtOperationTime提供使用 的功能。使用 時startAtOperationTime,變更串流游標只會傳回在指定時間戳記或之後發生的變更。startAtOperationTimeresumeAfter命令是互斥的,因此無法同時使用。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """

變更串流中的交易

變更串流事件不會包含來自未認可和/或中止交易的事件。例如,如果您使用一個INSERT操作和一個UPDATE操作以及 啟動交易。如果您的INSERT操作成功,但UPDATE操作失敗,交易將復原。由於此交易已復原,您的變更串流將不會包含此交易的任何事件。

修改變更串流日誌保留期間

您可以使用 AWS Management Console 或 將變更串流日誌保留期間修改為 1 小時至 7 天 AWS CLI。

Using the AWS Management Console
修改變更串流日誌保留期限
  1. 登入 AWS Management Console,然後在 https://console.aws.amazon.com/docdb 開啟 Amazon DocumentDB 主控台。

  2. 在導覽窗格中,選擇 Parameter groups (參數群組)。

    提示

    如果畫面左側沒有出現導覽窗格,請選擇頁面左上角的功能表圖示 (Hamburger menu icon with three horizontal lines.)。

  3. Parameter groups (參數群組) 窗格中,選擇與叢集相關聯的叢集參數群組。若要識別與叢集相關聯的叢集參數群組,請參閱確定 Amazon DocumentDB 集群的參數組

  4. 結果頁面會顯示這個叢集參數群組的參數,及其對應詳細資訊。選取 change_stream_log_retention_duration 參數。

  5. 在頁面右上角,選擇 Edit (編輯) 以變更此參數值。change_stream_log_retention_duration 參數可以修改為 1 小時至 7 天。

  6. 進行變更,然後選擇 Modify cluster parameter (修改叢集參數) 以儲存變更。若要捨棄變更,請選擇 Cancel (取消)

Using the AWS CLI

若要修改叢集參數群組的 change_stream_log_retention_duration 參數,請使用 modify-db-cluster-parameter-group 操作搭配下列參數:

  • --db-cluster-parameter-group-name – 必要。您想要修改的叢集參數群組名稱。若要識別與叢集相關聯的叢集參數群組,請參閱確定 Amazon DocumentDB 集群的參數組

  • --parameters – 必要。您要修改的參數。每個參數項目必須包括下列:

    • ParameterName — 您正在修改的參數名稱。在這種情況下,它是 change_stream_log_retention_duration

    • ParameterValue — 此參數的新值。

    • ApplyMethod — 您希望如何套用此參數的變更。允許值為 immediatepending-reboot

      注意

      static 必須擁有使用 ApplyType 的參數 ApplyMethod pending-reboot

  1. 若要變更參數 change_stream_log_retention_duration 的值,請執行下列命令,並以您想要修改參數的值取代 parameter-value

    若為 Linux、macOS 或 Unix:

    aws docdb modify-db-cluster-parameter-group \ --db-cluster-parameter-group-name sample-parameter-group \ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    針對 Windows:

    aws docdb modify-db-cluster-parameter-group ^ --db-cluster-parameter-group-name sample-parameter-group ^ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    此操作的輸出看起來類似以下 (JSON 格式)。

    { "DBClusterParameterGroupName": "sample-parameter-group" }
  2. 等待至少 5 分鐘。

  3. 列出 sample-parameter-group 的參數值確保已進行變更。

    若為 Linux、macOS 或 Unix:

    aws docdb describe-db-cluster-parameters \ --db-cluster-parameter-group-name sample-parameter-group

    針對 Windows:

    aws docdb describe-db-cluster-parameters ^ --db-cluster-parameter-group-name sample-parameter-group

    此操作的輸出看起來類似以下 (JSON 格式)。

    { "Parameters": [ { "ParameterName": "audit_logs", "ParameterValue": "disabled", "Description": "Enables auditing on cluster.", "Source": "system", "ApplyType": "dynamic", "DataType": "string", "AllowedValues": "enabled,disabled", "IsModifiable": true, "ApplyMethod": "pending-reboot" }, { "ParameterName": "change_stream_log_retention_duration", "ParameterValue": "12345", "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.", "Source": "user", "ApplyType": "dynamic", "DataType": "integer", "AllowedValues": "3600-86400", "IsModifiable": true, "ApplyMethod": "immediate" } ] }
注意

在日誌大小大於 (>) 51,200MB 之前,變更串流日誌保留不會刪除早於設定change_stream_log_retention_duration值的日誌。

在次要執行個體上使用變更串流

若要開始使用次要執行個體上的變更串流,請使用 readPreference作為次要執行個體開啟變更串流游標。

您可以開啟變更串流游標,以監控特定集合或叢集或資料庫中所有集合的變更事件。您可以在任何 Amazon DocumentDB 執行個體上開啟變更串流游標,並從寫入器和讀取器執行個體擷取變更串流文件。您可以在寫入器和讀取器執行個體上開啟的不同變更串流游標之間共用變更串流權杖 (例如 resumeTokenstartOperationTime)。

範例

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)

次要執行個體上變更串流的準則和限制

  • 變更串流事件需要從主要執行個體複寫至次要執行個體。您可以從 Amazon 中的DBInstanceReplicaLag指標監控延遲 CloudWatch。

  • 次要執行個體上的時間戳記不一定會與主要執行個體同步。在此情況下, 預期次要執行個體時間戳記會延遲,以便趕上進度。最佳實務是,建議您使用 startAtOperationTimeresumeToken 在次要執行個體上啟動手錶。

  • 如果文件大小較大、正在執行 fullDocumentLookup,且主要執行個體具有高並行寫入工作負載,則次要執行個體的輸送量可能會比主要執行個體低。最佳實務是,建議您監控次要 上的緩衝區快取命中率,並確保緩衝區快取命中率很高。