Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Menggunakan aliran perubahan dengan Amazon DocumentDB
Fitur aliran perubahan di Amazon DocumentDB (dengan kompatibilitas MongoDB) menyediakan urutan waktu peristiwa perubahan yang terjadi dalam koleksi klaster Anda. Anda dapat membaca peristiwa dari aliran perubahan untuk menerapkan banyak kasus penggunaan yang berbeda, termasuk yang berikut:
-
Notifikasi perubahan
-
Pencarian teks lengkap dengan Amazon OpenSearch Service (OpenSearch Layanan)
-
Analitik dengan Amazon Redshift
Aplikasi dapat menggunakan aliran perubahan untuk berlangganan perubahan data pada koleksi individu. Peristiwa aliran perubahan diurutkan saat terjadi di klaster dan disimpan selama 3 jam (secara default) setelah peristiwa dicatat. Periode retensi dapat diperpanjang hingga 7 hari menggunakan parameter change_stream_log_retention_duration
. Untuk mengubah periode retensi aliran perubahan, lihat Memodifikasi Durasi Retensi Log Aliran Perubahan.
Topik
- Operasi yang didukung
- Penagihan
- Batasan
- Mengaktifkan aliran perubahan
- Contoh: menggunakan aliran perubahan dengan Python
- Pencarian dokumen lengkap
- Melanjutkan aliran perubahan
- Melanjutkan aliran perubahan dengan startAtOperationTime
- Transaksi di aliran perubahan
- Memodifikasi durasi retensi log aliran perubahan
- Menggunakan aliran perubahan pada instance sekunder
Operasi yang didukung
Amazon DocumentDB mendukung operasi berikut untuk aliran perubahan:
-
Semua peristiwa perubahan didukung di
db.collection.watch()
db.watch()
MongoDB, dan.client.watch()
API -
Pencarian dokumen lengkap untuk pembaruan.
-
Tahap agregasi:
$match
,$project
,$redact
, serta$addFields
dan$replaceRoot
. -
Melanjutkan aliran perubahan dari token resume
-
Melanjutkan aliran perubahan dari stempel waktu menggunakan
startAtOperation
(berlaku untuk Amazon DocumentDB 4.0+)
Penagihan
Fitur aliran perubahan Amazon DocumentDB dinonaktifkan secara default dan tidak dikenakan biaya tambahan apa pun hingga fitur tersebut diaktifkan. Menggunakan aliran perubahan dalam klaster menimbulkan biaya baca dan tulis IOs dan penyimpanan tambahan. Anda dapat menggunakan modifyChangeStreams
API operasi untuk mengaktifkan fitur ini untuk cluster Anda. Untuk informasi selengkapnya tentang penentuan harga, lihat Penentuan harga Amazon DocumentDB
Batasan
Aliran perubahan memiliki batasan berikut di Amazon DocumentDB:
-
Di Amazon DocumentDB 3.6. dan Amazon DocumentDB 4.0, aliran perubahan hanya dapat dibuka dari koneksi ke instance utama cluster Amazon DocumentDB. Membaca dari aliran perubahan pada instance replika tidak didukung di Amazon DocumentDB 3.6. dan Amazon DocumentDB 4.0. Saat menjalankan
watch()
API operasi, Anda harus menentukan preferensiprimary
baca untuk memastikan bahwa semua pembacaan diarahkan ke instance utama (lihat bagian Contoh). -
Di Amazon DocumentDB 5.0, aliran perubahan dapat dibuka dari instans primer dan instance sekunder, termasuk klaster global. Anda dapat menentukan preferensi baca sekunder untuk mengarahkan aliran perubahan ke instance sekunder. Lihat Menggunakan aliran perubahan pada instance sekunder untuk praktik dan batasan terbaik tambahan.
-
Peristiwa yang ditulis ke aliran perubahan untuk koleksi tersedia hingga 7 hari (defaultnya adalah 3 jam). Data aliran perubahan dihapus setelah jendela durasi retensi log, meskipun tidak ada perubahan baru yang terjadi.
-
Operasi tulis yang berjalan lama pada koleksi seperti
updateMany
ataudeleteMany
dapat menghentikan sementara penulisan peristiwa aliran perubahan hingga operasi tulis yang berjalan lama selesai. -
Amazon DocumentDB tidak mendukung log operasi MongoDB (
oplog
). -
Dengan Amazon DocumentDB, Anda harus secara eksplisit mengaktifkan aliran perubahan pada koleksi tertentu.
-
Jika ukuran total peristiwa aliran perubahan (termasuk data perubahan dan dokumen lengkap, jika diminta) lebih besar dari
16 MB
, klien akan mengalami kegagalan baca pada aliran perubahan. -
Driver Ruby saat ini tidak didukung saat menggunakan
db.watch()
danclient.watch()
dengan Amazon DocumentDB 3.6. -
Output dari
updateDescription
perintah dalam aliran perubahan berbeda di Amazon DocumentDB daripada di MongoDB ketika nilai bidang yang diperbarui sama dengan yang sebelumnya:Amazon DocumentDB tidak mengembalikan bidang dalam
updateDescription
output jika bidang yang disediakan ditentukan dalam$set
perintah dan nilai targetnya sudah sama dengan nilai sumber.MongoDB mengembalikan bidang dalam output, bahkan jika nilai yang ditentukan sama dengan nilai saat ini.
Mengaktifkan aliran perubahan
Anda dapat mengaktifkan aliran perubahan Amazon DocumentDB untuk semua koleksi dalam basis data tertentu, atau hanya untuk koleksi yang dipilih. Berikut ini adalah contoh cara mengaktifkan aliran perubahan untuk kasus penggunaan yang berbeda menggunakan shell mongo. String kosong diperlakukan sebagai wildcard ketika menentukan nama basis data dan koleksi.
//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});
Aliran perubahan akan diaktifkan untuk koleksi jika apa pun dari berikut ini benar:
-
Baik basis data dan koleksi diaktifkan secara eksplisit.
-
Basis data yang berisi koleksi diaktifkan.
-
Semua basis data diaktifkan.
Menjatuhkan koleksi dari basis data tidak menonaktifkan aliran perubahan untuk koleksi tersebut jika basis data induk juga mengaktifkan aliran perubahan, atau jika semua basis data di klaster diaktifkan. Jika koleksi baru dibuat dengan nama yang sama dengan koleksi yang dihapus, aliran perubahan akan diaktifkan untuk koleksi tersebut.
Anda dapat membuat daftar semua aliran perubahan yang diaktifkan klaster Anda dengan menggunakan tahap alur agregasi $listChangeStreams
. Semua tahapan agregasi yang didukung oleh Amazon DocumentDB dapat digunakan dalam alur untuk pemrosesan tambahan. Jika koleksi yang sebelumnya diaktifkan telah dinonaktifkan, koleksi tersebut tidak akan muncul di output $listChangeStreams
.
//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));
Contoh: menggunakan aliran perubahan dengan Python
Berikut ini adalah contoh penggunaan aliran perubahan Amazon DocumentDB dengan Python di tingkat koleksi.
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """
Berikut ini adalah contoh penggunaan aliran perubahan Amazon DocumentDB dengan Python di tingkat basis data.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """
Pencarian dokumen lengkap
Peristiwa perubahan pembaruan tidak menyertakan dokumen lengkap; itu mencakup hanya perubahan yang telah dibuat. Jika kasus penggunaan Anda memerlukan dokumen lengkap yang terpengaruh oleh pembaruan, Anda dapat mengaktifkan pencarian dokumen lengkap saat membuka aliran.
Dokumen fullDocument
untuk peristiwa aliran perubahan pembaruan mewakili versi terbaru dari dokumen yang diperbarui pada saat pencarian dokumen. Jika terjadi perubahan antara operasi pembaruan dan pencarian fullDocument
, dokumen fullDocument
mungkin tidak mewakili status dokumen pada waktu pembaruan.
Untuk membuat objek streaming dengan pencarian pembaruan diaktifkan, gunakan contoh ini:
stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()
Output dari objek stream akan terlihat seperti ini:
{'_id': {'_data': '015daf9b7c00000001010000000100009025'},
'clusterTime': Timestamp(1571789692, 1),
'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'update',
'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}
Melanjutkan aliran perubahan
Anda dapat melanjutkan aliran perubahan nanti dengan menggunakan token resume, yang sama dengan bidang _id
dari dokumen peristiwa perubahan yang terakhir diambil.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """
Melanjutkan aliran perubahan dengan startAtOperationTime
Anda dapat melanjutkan aliran perubahan nanti dari stempel waktu tertentu dengan menggunakan startAtOperationTime
.
catatan
Kemampuan untuk menggunakan startAtOperationTime
tersedia di Amazon DocumentDB 4.0+. Saat menggunakan startAtOperationTime
, kursor aliran perubahan hanya akan mengembalikan perubahan yang terjadi pada atau setelah Stempel Waktu yang ditentukan. Perintah startAtOperationTime
dan resumeAfter
saling eksklusif dan karenanya tidak dapat digunakan bersama.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """
Transaksi di aliran perubahan
Peristiwa aliran perubahan tidak akan berisi peristiwa dari transaksi yang tidak dikomit dan/atau dibatalkan. Misalnya, jika Anda memulai transaksi dengan satu operasi INSERT
dan satu operasi UPDATE
dan. Jika operasi INSERT
Anda berhasil, tetapi operasi UPDATE
gagal, transaksi akan digulung kembali. Karena transaksi ini digulung kembali, aliran perubahan Anda tidak akan berisi peristiwa apa pun untuk transaksi ini.
Memodifikasi durasi retensi log aliran perubahan
Anda dapat mengubah durasi retensi log aliran perubahan menjadi antara 1 jam dan 7 hari menggunakan AWS Management Console atau AWS CLI.
catatan
Mengubah retensi log aliran tidak akan menghapus log yang lebih lama dari change_stream_log_retention_duration
nilai yang dikonfigurasi hingga ukuran log lebih besar dari (>) 51.200MB.
Menggunakan aliran perubahan pada instance sekunder
Untuk memulai menggunakan aliran perubahan pada instance sekunder, buka kursor aliran ubah dengan readPreference
sebagai sekunder.
Anda dapat membuka kursor aliran perubahan untuk melihat peristiwa perubahan pada koleksi tertentu atau semua koleksi dalam cluster atau database. Anda dapat membuka kursor aliran perubahan pada instans Amazon DocumentDB apa pun dan mengambil dokumen aliran perubahan dari instance penulis dan pembaca. Anda dapat membagikan token aliran perubahan (seperti resumeToken
ataustartOperationTime
) di berbagai kursor aliran perubahan yang dibuka pada instance penulis dan pembaca.
Contoh
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)
Pedoman dan batasan untuk aliran perubahan pada instance sekunder
Peristiwa aliran perubahan perlu direplikasi dari instance primer ke instance sekunder. Anda dapat memantau lag dari
DBInstanceReplicaLag
metrik di Amazon CloudWatch.Stempel waktu pada instance sekunder mungkin tidak selalu sinkron dengan instance utama. Dalam hal ini, harapkan penundaan pada stempel waktu instance sekunder sehingga dapat mengejar ketinggalan. Sebagai praktik terbaik, kami sarankan
resumeToken
untuk menggunakanstartAtOperationTime
atau memulai jam tangan pada instance sekunder.Anda mungkin mengalami throughput yang lebih rendah pada instance sekunder dibandingkan dengan instance utama jika ukuran dokumen Anda besar, Anda melakukannya
fullDocumentLookup
, dan ada beban kerja penulisan bersamaan yang tinggi pada instance utama. Sebagai praktik terbaik, kami sarankan Anda memantau rasio hit cache buffer Anda di sekunder dan memastikan bahwa rasio hit cache buffer tinggi.