

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon DocumentDB を用いた変更ストリームの使用
<a name="change_streams"></a>

Amazon DocumentDB (MongoDB 互換) の変更ストリーム機能は、クラスターのコレクション内で発生する変更イベントの時系列シーケンスを提供します。変更ストリームからイベントを読み取って、次のようなさまざまなユースケースを実装できます。
+ 変更通知
+ Amazon OpenSearch Service (OpenSearch Service) を用いたフルテキスト検索
+ Amazon Redshift を用いてのイベントの分析

アプリケーションは変更ストリームを使用して、個々のコレクションでデータ変更をサブスクライブすることができます。変更ストリームイベントは、クラスターで発生する際に順序付けされ、イベントが記録されてから 3 時間 (デフォルト) 保存されます。`change_stream_log_retention_duration` のパラメータを使って、保存期間を7日まで延長できます。変更ストリーム保持期間を変更するには、[Modifying the Change Stream Log Retention Duration](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-modifying_log_retention) を参照してください。

**Topics**
+ [サポートされているオペレーション](#change_streams-supported_ops)
+ [料金](#change_streams-billing)
+ [制限事項](#change_streams-limitations)
+ [変更ストリームの有効化](#change_streams-enabling)
+ [例: Python での変更ストリームの使用](#change_streams-using_example)
+ [ドキュメント全体の参照](#change_streams-lookup)
+ [変更ストリームの再開](#change_streams-resuming)
+ [`startAtOperationTime` を用いた変更ストリームの再開](#change_streams-startAtOperation)
+ [`postBatchResumeToken` を用いた変更ストリームの再開](#change_streams-postBatchResumeToken)
+ [変更ストリーム内のトランザクション](#change_streams-transactions)
+ [変更ストリームログの保持期間の変更](#change_streams-modifying_log_retention)
+ [セカンダリインスタンスでの変更ストリームの使用](#change-streams-secondary-instances)

## サポートされているオペレーション
<a name="change_streams-supported_ops"></a>

Amazon DocumentDB では、変更ストリームに対して次の操作がサポートされています。
+ すべての変更イベントは、MongoDB の `db.collection.watch()`、`db.watch()`、および `client.watch()` のAPI 中でサポートされている全ての変更イベント。
+ 更新のための完全ドキュメント参照。
+ 集約ステージ:`$match`、`$project`、`$redact`、および `$addFields` そして `$replaceRoot`。
+ 再開トークンからの変更ストリームの再開
+ `startAtOperation` (Amazon DocumentDB v4.0\$1 に適用される) を使用して、タイムスタンプから変更ストリームを再開する

## 料金
<a name="change_streams-billing"></a>

Amazon DocumentDB 変更ストリーム機能はデフォルトで無効になっており、この機能が有効化されるまで追加料金は発生しません。クラスターで変更ストリームを使用すると、追加の読み取り、および書き込み IO とストレージのコストが発生します。`modifyChangeStreams` の API オペレーションを使用して、クラスターのためのこの機能を有効にできます。料金のさらなる詳細については、[Amazon DocumentDB の料金](https://aws.amazon.com/documentdb/pricing/) を参照してください。

## 制限事項
<a name="change_streams-limitations"></a>

変更ストリームには、Amazon DocumentDB で以下の制限があります。
+ Amazon DocumentDB 3.6. および Amazon DocumentDB 4.0 では、変更ストリームは Amazon DocumentDB クラスターのプライマリインスタンスへの接続からのみ開くことができます。レプリカインスタンス上の変更ストリームからの読み取りは、Amazon DocumentDB 3.6 および Amazon DocumentDB 4.0 ではサポートされていません。`watch()` API オペレーションを呼び出すときは、すべての読み取りがプライマリインスタンスに送信されるようにする `primary` の読み取り設定を指定する必要があります (「[例](#change_streams-using_example)」セクションを参照)。
+ Amazon DocumentDB 5.0 では、グローバルクラスターを含むプライマリインスタンスとセカンダリインスタンスの両方から変更ストリームを開くことができます。セカンダリ読み取り設定を指定して、変更ストリームをセカンダリインスタンスにリダイレクトできます。その他のベストプラクティスと制限事項については、「[セカンダリインスタンスでの変更ストリームの使用](#change-streams-secondary-instances)」を参照してください。
+ コレクションの変更ストリームに書き込まれるイベントは、最大 7 日間 (デフォルトは 3 時間) 使用できます。変更ストリームのデータは、新しい変更が行われていない場合でも、ログの保持期間が終了すると削除されます。
+ `updateMany` や `deleteMany` のようなコレクションで長時間実行されている書き込みオペレーションは、長時間実行されている書き込みオペレーションが完了するまで、変更ストリームイベントの書き込みを一時的に停止する可能性があります。
+ Amazon DocumentDB は、MongoDB オペレーションログ (`oplog`) をサポートしていません。
+ Amazon DocumentDB を用いて、特定のコレクションで変更ストリームを明示的に有効にする必要があります。
+ 変更ストリームイベントの合計サイズ (要求された場合、変更データとドキュメント全体を含む) が `16 MB` より大きい場合、変更ストリームでの読み取りエラーがクライアントで発生します。
+ Ruby ドライバは、現在、Amazon DocumentDB v3.6 での `db.watch()` と `client.watch()` の使用時にはサポートされていません。
+ フィールドの更新値が以前の値と同じ場合、Amazon DocumentDB と MongoDB では変更ストリームの `updateDescription` コマンドからの出力が異なります。
  + 提供されるフィールドが `$set` コマンド内で指定されており、かつターゲット値が既にソース値と等しい場合、Amazon DocumentDB は `updateDescription` 出力内でフィールドを返しません。
  + MongoDB は、指定された値が現在の値と等しい場合でも、出力内でフィールドを返します。

## 変更ストリームの有効化
<a name="change_streams-enabling"></a>

Amazon DocumentDB 変更ストリームは、特定のデータベース内のすべてのコレクションに対して有効にすることも、選択したコレクションに対してのみ有効にすることもできます。以下は、mongo シェルを使用してさまざまなユースケースの変更ストリームを有効にする方法の例です。データベース名とコレクション名を指定する場合、空の文字列はワイルドカードとして扱われます。

```
//Enable change streams for the collection "foo" in database "bar"
db.adminCommand({modifyChangeStreams: 1,
    database: "bar",
    collection: "foo", 
    enable: true});
```

```
//Disable change streams on collection "foo" in database "bar"
db.adminCommand({modifyChangeStreams: 1,
    database: "bar",
    collection: "foo", 
    enable: false});
```

```
//Enable change streams for all collections in database "bar"
db.adminCommand({modifyChangeStreams: 1,
    database: "bar",
    collection: "", 
    enable: true});
```

```
//Enable change streams for all collections in all databases in a cluster
db.adminCommand({modifyChangeStreams: 1,
    database: "",
    collection: "", 
    enable: true});
```

次のいずれかに該当する場合、変更ストリームはコレクションに対して有効になります。
+ データベースとコレクションの両方が明示的に有効になっている。
+ コレクションを含むデータベースが有効になっている。
+ すべてのデータベースが有効になっている。

親データベースでも変更ストリームが有効になっている場合、またはクラスター内のすべてのデータベースが有効になっている場合は、データベースからコレクションを削除しても、そのコレクションの変更ストリームは無効になりません。削除されたコレクションと同じ名前で新しいコレクションが作成された場合、そのコレクションに対して変更ストリームが有効になります。

`$listChangeStreams` 集約パイプラインステージを使用すると、クラスターで有効なすべての変更ストリームを一覧表示できます。Amazon DocumentDB でサポートされているすべての集約ステージは、パイプラインで追加の処理に使用できます。以前に有効にしたコレクションが無効になっている場合、そのコレクションは `$listChangeStreams` 出力に表示されません。

```
//List all databases and collections with change streams enabled
cursor = new DBCommandCursor(db,
    db.runCommand(
        {aggregate: 1,
        pipeline: [{$listChangeStreams: 1}], 
        cursor:{}}));
```

```
//List of all databases and collections with change streams enabled 
{ "database" : "test", "collection" : "foo" } 
{ "database" : "bar", "collection" : "" }
{ "database" : "", "collection" : "" }
```

```
//Determine if the database “bar” or collection “bar.foo” have change streams enabled
cursor = new DBCommandCursor(db,
  db.runCommand(
      {aggregate: 1,
       pipeline: [{$listChangeStreams: 1},
                  {$match: {$or: [{database: "bar", collection: "foo"},
                                  {database: "bar", collection: ""},
                                  {database: "", collection: ""}]}}
                 ],
      cursor:{}}));
```

## 例: Python での変更ストリームの使用
<a name="change_streams-using_example"></a>

コレクションレベルで、 Python を用いて Amazon DocumentDB 変更ストリームを使用する例を次に示します。

```
import os
import sys
from pymongo import MongoClient, ReadPreference
      
username = "DocumentDBusername"
password = <Insert your password> 

clusterendpoint = "DocumentDBClusterEndpoint”
client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem')

db = client['bar']
 
#While ‘Primary’ is the default read preference, here we give an example of
#how to specify the required read preference when reading the change streams
coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY)
#Create a stream object
stream = coll.watch()
#Write a new document to the collection to generate a change event
coll.insert_one({'x': 1})
#Read the next change event from the stream (if any)
print(stream.try_next())

"""
Expected Output:
{'_id': {'_data': '015daf94f600000002010000000200009025'},
'clusterTime': Timestamp(1571788022, 2),
'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')},
'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'insert'}
"""

#A subsequent attempt to read the next change event returns nothing, as there are no new changes
print(stream.try_next())

"""
Expected Output:
None
""" 
 
#Generate a new change event by updating a document
result = coll.update_one({'x': 1}, {'$set': {'x': 2}})
print(stream.try_next())

"""
Expected Output:
{'_id': {'_data': '015daf99d400000001010000000100009025'},
'clusterTime': Timestamp(1571789268, 1),
'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'update',
'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}}
"""
```

データベースレベルで、 Python を用いて Amazon DocumentDB 変更ストリームを使用する例を次に示します。

```
import os
import sys
from pymongo import MongoClient

username = "DocumentDBusername"
password = <Insert your password>
clusterendpoint = "DocumentDBClusterEndpoint”
client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem')

db = client['bar']
#Create a stream object
stream = db.watch()
coll = db.get_collection('foo')
#Write a new document to the collection foo to generate a change event
coll.insert_one({'x': 1})
 
#Read the next change event from the stream (if any)
print(stream.try_next())

"""
Expected Output:
{'_id': {'_data': '015daf94f600000002010000000200009025'},
'clusterTime': Timestamp(1571788022, 2),
'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')},
'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'insert'}
"""
#A subsequent attempt to read the next change event returns nothing, as there are no new changes
print(stream.try_next())

"""
Expected Output:
None
""" 
 
coll = db.get_collection('foo1')

#Write a new document to another collection to generate a change event
coll.insert_one({'x': 1})
print(stream.try_next())

"""
Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database
{'_id': {'_data': '015daf94f600000002010000000200009025'},
'clusterTime': Timestamp(1571788022, 2),
'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')},
'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1},
'ns': {'coll': 'foo1', 'db': 'bar'},
'operationType': 'insert'}
"""
```

## ドキュメント全体の参照
<a name="change_streams-lookup"></a>

変更の更新イベントには、ドキュメント全体は含まれません。加えられた変更のみが含まれます。ユースケースで、更新の影響を受ける完全なドキュメントが必要な場合は、ストリームを開くときに完全なドキュメント参照を有効にできます。

変更の更新ストリームイベントの `fullDocument` ドキュメントは、ドキュメント参照時に更新されたドキュメントの最新バージョンを表します。更新オペレーションと `fullDocument` ルックアップの間で変更が発生した場合、`fullDocument` ドキュメントが更新時にドキュメントの状態を表していない可能性があります。

更新ルックアップを有効にしてストリームオブジェクトを作成するには、次の例を使用します。

```
stream = coll.watch(full_document='updateLookup')
 
#Generate a new change event by updating a document
result = coll.update_one({'x': 2}, {'$set': {'x': 3}})

stream.try_next()
```

ストリームオブジェクトの出力は次のようになります。

```
{'_id': {'_data': '015daf9b7c00000001010000000100009025'},
'clusterTime': Timestamp(1571789692, 1),
'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'update',
'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}
```

## 変更ストリームの再開
<a name="change_streams-resuming"></a>

再開トークンを使用すると、変更ストリームを後で再開できます。このトークンは、最後に取得した変更イベントドキュメントの `_id` フィールドと同じです。

```
import os
import sys
from pymongo import MongoClient

username = "DocumentDBusername"
password = <Insert your password> 
clusterendpoint = "DocumentDBClusterEndpoint”
client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false')

db = client['bar']
coll = db.get_collection('foo')
#Create a stream object
stream = db.watch()
coll.update_one({'x': 1}, {'$set': {'x': 4}})
event = stream.try_next()
token = event['_id']
print(token)

"""
Output: This is the resume token that we will later us to resume the change stream
{'_data': '015daf9c5b00000001010000000100009025'}
"""
#Python provides a nice shortcut for getting a stream’s resume token
print(stream.resume_token)

"""
Output
{'_data': '015daf9c5b00000001010000000100009025'}
"""
#Generate a new change event by updating a document
result = coll.update_one({'x': 4}, {'$set': {'x': 5}})
#Generate another change event by inserting a document
result = coll.insert_one({'y': 5})
#Open a stream starting after the selected resume token
stream = db.watch(full_document='updateLookup', resume_after=token)
#Our first change event is the update with the specified _id
print(stream.try_next())

"""
#Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5}

{'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 
'operationType': 'update', 
'clusterTime': Timestamp(1602129676, 6), 
'ns': {'db': 'bar', 'coll': 'foo'}, 
'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 
'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 
'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}}
"""
#Followed by the insert
print(stream.try_next())

"""
#Output:
{'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 
'operationType': 'insert', 
'clusterTime': Timestamp(1602129676, 7), 
'ns': {'db': 'bar', 'coll': 'foo'}, 
'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 
'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}}
"""
```

## `startAtOperationTime` を用いた変更ストリームの再開
<a name="change_streams-startAtOperation"></a>

`startAtOperationTime` を使って、特定のタイムスタンプから後で変更ストリームを再開できます。

**注記**  
`startAtOperationTime` を使用する能力は、Amazon DocumentDB 4.0\$1 で入手可能です。`startAtOperationTime` を使用する場合、変更ストリームカーソルは、指定された Timestamp 以降で発生した変更のみを返します。`startAtOperationTime` と `resumeAfter` のコマンドは相互に排他的であるため、一緒に使用することはできません。

```
import os
import sys
from pymongo import MongoClient

username = "DocumentDBusername"
password = <Insert your password> 
clusterendpoint = "DocumentDBClusterEndpoint”
client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false')
db = client['bar']
coll = db.get_collection('foo')
#Create a stream object
stream = db.watch()
coll.update_one({'x': 1}, {'$set': {'x': 4}})
event = stream.try_next()
timestamp = event['clusterTime']
print(timestamp)
"""
Output
Timestamp(1602129114, 4)
"""
#Generate a new change event by updating a document
result = coll.update_one({'x': 4}, {'$set': {'x': 5}})
result = coll.insert_one({'y': 5})
#Generate another change event by inserting a document
#Open a stream starting after specified time stamp

stream = db.watch(start_at_operation_time=timestamp)
print(stream.try_next())

"""
#Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event
{'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 
'operationType': 'update', 
'clusterTime': Timestamp(1602130970, 3), 
'ns': {'db': 'bar', 'coll': 'foo'}, 
'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 
'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}}
"""

print(stream.try_next())
"""
#Output: The second event will be the subsequent update operation (x:5)
{'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 
'operationType': 'update', 
'clusterTime': Timestamp(1602131202, 5),
'ns': {'db': 'bar', 'coll': 'foo'}, 
'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 
'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}}
"""

print(stream.try_next())

"""
#Output: And finally the last event will be the insert operation (y:5)
{'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 
'operationType': 'insert', 
'clusterTime': Timestamp(1602131202, 6), 
'ns': {'db': 'bar', 'coll': 'foo'}, 
'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 
'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}}
"""
```

## `postBatchResumeToken` を用いた変更ストリームの再開
<a name="change_streams-postBatchResumeToken"></a>

Amazon DocumentDB 変更ストリームは、`postBatchResumeToken` という追加のフィールドを返すようになりました。このフィールドは、 `$changestream` コマンドと `getMore` コマンドから返されます。

Python の `$changestream` コマンドの例:

```
db.command({"aggregate": "sales", "pipeline": [{ "$changeStream": {}}], "cursor": {"batchSize": 1}})
```

正常な出力:

```
cursor" : {
   "firstBatch" : [ ],
   "postBatchResumeToken" : {"_data" : "0167c8cbe60000000004"},
   "id" : NumberLong("9660788144470"),
   "ns" : "test.sales"
}
```

Python の `getMore` コマンドの例:

```
db.command({"getMore": NumberLong(<cursor id>), "collection": "sales", "batchSize": 1 })
```

正常な出力

```
cursor" : {
   "nextBatch" : [ ],
   "postBatchResumeToken" : {"_data" : "0167c8cbe60000000004"},
   "id" : NumberLong("9660788144470"),
   "ns" : "test.sales"
}
```

`postBatchResumeToken` フィールドは、再開トークンの使用方法と同様に、`resumeAfter` フィールドで新しい変更ストリームカーソルを開くために使用できます。

選択した `postBatchResumeToken` の後に開始するストリームを開きます。

```
post_batch_resume_token = output['cursor']['postBatchResumeToken']
stream = db.watch(full_document='updateLookup', resume_after=post_batch_resume_token)
```

実際のイベントを反映するオペレーションログ (oplog) エントリに常に対応する通常の再開トークンとは異なり、`postBatchResumeToken` は変更ストリームがサーバーでスキャンした oplog エントリに対応しますが、これは必ずしも一致する変更ではありません。

古い通常の再開トークンで再開しようとすると、強制的にデータベースが指定されたタイムスタンプと現在の時刻の間のすべての oplog エントリをスキャンするようになります。これにより、各サブクエリスキャンで短時間、多数のクエリが内部で生成される可能性があります。これにより、CPU 使用率が急増し、データベースのパフォーマンスが低下します。最後の `postBatchResumeToken` で再開すると、一致しない oplog エントリのスキャンがスキップされます。

## 変更ストリーム内のトランザクション
<a name="change_streams-transactions"></a>

変更ストリームイベントには、コミットされていないトランザクションや中止されたトランザクションのイベントは含まれません。例えば、1 つの `INSERT` オペレーションと 1 つの `UPDATE` オペレーションでトランザクションを開始し、`INSERT` オペレーションが成功しても `UPDATE` オペレーションが失敗した場合、トランザクションはロールバックされます。このトランザクションはロールバックされたため、変更ストリームにはこのトランザクションのイベントは含まれません。

## 変更ストリームログの保持期間の変更
<a name="change_streams-modifying_log_retention"></a>

 AWS マネジメントコンソール または を使用して、変更ストリームログの保持期間を 1 時間から 7 日の間で変更できます AWS CLI。

------
#### [ Using the AWS マネジメントコンソール ]

**変更ストリームログの保持期間を変更するには**

1. にサインインし AWS マネジメントコンソール、[https://console.aws.amazon.com/docdb](https://console.aws.amazon.com/docdb) で Amazon DocumentDB コンソールを開きます。

1. ナビゲーションペインで、**[パラメータグループ]** を選択します。
**ヒント**  
画面の左側にナビゲーションペインが表示されない場合は、ページの左上隅にあるメニューアイコン (![\[Hamburger menu icon with three horizontal lines.\]](http://docs.aws.amazon.com/ja_jp/documentdb/latest/developerguide/images/docdb-menu-icon.png)) を選択します。

1. [**パラメータグループ**] ペインで、クラスターに関連付けられたクラスターパラメータグループを選択します。クラスターに関連付けられているクラスターパラメータグループを調べるには、「[Amazon DocumentDB クラスターのパラメータグループの確認をする](cluster_parameter_groups-describe.md#cluster_parameter_groups-determine)」を参照してください。

1. 結果のページには、クラスターパラメータグループのパラメータと対応する詳細が表示されます。`change_stream_log_retention_duration` パラメータを選択します。

1. ページの右上にある [**編集**] を選択して、パラメータの値を変更します。`change_stream_log_retention_duration` のパラメータは、1 時間から 7 日の間で変更できます。

1. 変更を行い、[**クラスターのパラメータを変更**] を選択して変更を保存します。変更を破棄するには、[**キャンセル**] を選択します。

------
#### [ Using the AWS CLI ]

クラスターパラメータグループの `change_stream_log_retention_duration` パラメータを変更するには、以下のパラメータを指定して `modify-db-cluster-parameter-group` オペレーションを使用します。
+ **--db-cluster-parameter-group-name** — 必須。変更するクラスターパラメータグループの名前。クラスターに関連付けられているクラスターパラメータグループを調べるには、「[Amazon DocumentDB クラスターのパラメータグループの確認をする](cluster_parameter_groups-describe.md#cluster_parameter_groups-determine)」を参照してください。
+ **--parameters** — 必須。変更するパラメータ。各パラメータの入力には以下を含める必要があります。
  + **ParameterName** － 変更しているパラメータの名前。この場合は `change_stream_log_retention_duration` 
  + **ParameterValue** － このパラメータの新しい値。
  + **ApplyMethod** - このパラメータの変更を適用する方法です。有効な値は、`immediate` および `pending-reboot` です。
**注記**  
`ApplyType` が `static` であるパラメータでは、`ApplyMethod` が `pending-reboot` である必要があります。

1. パラメータ `change_stream_log_retention_duration` の値を変更するには、次のコマンドを実行し、`parameter-value` をパラメータの変更後の値に置き換えます。

   Linux、macOS、Unix の場合:

   ```
   aws docdb modify-db-cluster-parameter-group \
       --db-cluster-parameter-group-name sample-parameter-group \
       --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"
   ```

   Windows の場合:

   ```
   aws docdb modify-db-cluster-parameter-group ^
       --db-cluster-parameter-group-name sample-parameter-group ^
       --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"
   ```

   このオペレーションによる出力は、次のようになります (JSON 形式)。

   ```
   {
       "DBClusterParameterGroupName": "sample-parameter-group"
   }
   ```

1. 少なくとも 5 分お待ち下さい。

1. `sample-parameter-group` のパラメータ値を一覧表示し、変更が行われたことを確認します。

   Linux、macOS、Unix の場合:

   ```
   aws docdb describe-db-cluster-parameters \
       --db-cluster-parameter-group-name sample-parameter-group
   ```

   Windows の場合:

   ```
   aws docdb describe-db-cluster-parameters ^
       --db-cluster-parameter-group-name sample-parameter-group
   ```

   このオペレーションによる出力は、次のようになります (JSON 形式)。

   ```
   {
       "Parameters": [
           {
               "ParameterName": "audit_logs",
               "ParameterValue": "disabled",
               "Description": "Enables auditing on cluster.",
               "Source": "system",
               "ApplyType": "dynamic",
               "DataType": "string",
               "AllowedValues": "enabled,disabled",
               "IsModifiable": true,
               "ApplyMethod": "pending-reboot"
           },
           {
               "ParameterName": "change_stream_log_retention_duration",
               "ParameterValue": "12345",
               "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.",
               "Source": "user",
               "ApplyType": "dynamic",
               "DataType": "integer",
               "AllowedValues": "3600-86400",
               "IsModifiable": true,
               "ApplyMethod": "immediate"
           }
       ]
   }
   ```

------

**注記**  
ストリームログの保存期間を変更しても、ログサイズが 51,200 MB より大きくなるまで (>)、設定した `change_stream_log_retention_duration` の値より古いログは削除されません。

## セカンダリインスタンスでの変更ストリームの使用
<a name="change-streams-secondary-instances"></a>

セカンダリインスタンスで変更ストリームの使用を開始するには、変更ストリームカーソルを `readPreference` でセカンダリとして開きます。

変更ストリームカーソルを開いて、特定のコレクション、または特定のクラスターもしくはデータベース内のすべてのコレクションの変更イベントを監視できます。任意の Amazon DocumentDB インスタンス上で変更ストリームカーソルを開き、ライターインスタンスとリーダーインスタンスの両方から変更ストリームドキュメントを取得できます。変更ストリームトークン (`resumeToken` や `startOperationTime` など) は、ライターインスタンスとリーダーインスタンスで開かれたさまざまな変更ストリームカーソル間で共有できます。

**例**

```
import os
import sys
from pymongo import MongoClient, ReadPreference
      
username = "DocumentDBusername"
password = <Your password> 

clusterendpoint = "DocumentDBClusterEndpoint"

client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem')

db = client['bar']
 
# Make sure to use SECONDARY to redirect cursor reads from secondary instances
coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY)

# Create a stream object on RO. The token needs to generated from PRIMARY.
stream = coll.watch(resumeAfter=token)

for event in stream:
   print(event)
```

**セカンダリインスタンス上の変更ストリームに関するガイドラインと制限事項**
+ 変更ストリームイベントは、プライマリインスタンスからセカンダリインスタンスにレプリケートする必要があります。また、Amazon CloudWatch で `DBInstanceReplicaLag` メトリクスからのラグをモニタリングすることもできます。
+ セカンダリインスタンスのタイムスタンプは、プライマリインスタンスと常に同期しているとは限りません。この場合、ずれを解消するためセカンダリインスタンスのタイムスタンプに遅延が発生する可能性があります。ベストプラクティスとして、`startAtOperationTime` または `resumeToken` を使用してセカンダリインスタンスでウォッチを起動することをお勧めします。
+ ドキュメントサイズが大きく、かつ`fullDocumentLookup` を実行していて、プライマリインスタンスへの同時書き込みワークロードが多い場合、プライマリインスタンスと比較してセカンダリインスタンスのスループットが低下する可能性があります。ベストプラクティスとして、セカンダリのバッファキャッシュヒット率をモニタリングし、バッファキャッシュヒット率が高いかどうか確認することをお勧めします。