

# Amazon RDS for PostgreSQL の論理レプリケーションの実行
<a name="PostgreSQL.Concepts.General.FeatureSupport.LogicalReplication"></a>

バージョン 10.4 以降、RDS for PostgreSQL は、PostgreSQL 10 で導入されたパブリケーションおよびサブスクリプション SQL 構文をサポートしています。詳細については、「PostgreSQL のドキュメント」の「[論理レプリケーション](https://www.postgresql.org/docs/current/logical-replication.html)」を参照してください。

**注記**  
PostgreSQL 10 で導入されたネイティブの PostgreSQL 論理レプリケーション機能に加えて、PostgreSQL 用 RDS は `pglogical` 拡張機能もサポートしています。詳細については、「[pglogical を使用してインスタンス間でデータを同期する](Appendix.PostgreSQL.CommonDBATasks.pglogical.md)」を参照してください。

RDS for PostgreSQL DB インスタンスに対する論理レプリケーションの設定については、次で説明します。

**Topics**
+ [論理レプリケーションと論理デコードについて](#PostgreSQL.Concepts.General.FeatureSupport.LogicalDecoding)
+ [論理的なレプリケーションスロットの使用](#PostgreSQL.Concepts.General.FeatureSupport.LogicalReplicationSlots)
+ [論理レプリケーションを使用したテーブルレベルのデータのレプリケーション](#PostgreSQL.Concepts.LogicalReplication.Tables)

## 論理レプリケーションと論理デコードについて
<a name="PostgreSQL.Concepts.General.FeatureSupport.LogicalDecoding"></a>

RDS for PostgreSQL は、PostgreSQL の論理レプリケーションスロットを使用した、ログ先行書き込み (WAL) 変更のストリーミングをサポートしています。また、ロジカルデコーディングの使用もサポートしています。インスタンスで論理的なレプリケーションスロットをセットアップし、それらのスロットを通じてデータベースの変更を `pg_recvlogical` などのクライアントにストリーミングできます。データベースレベルで論理レプリケーションスロットを作成すると、1 つのデータベースへのレプリケーション接続がサポートされます。

PostgreSQL 論理レプリケーション用の最も一般的なクライアントは、AWS Database Migration Service、または Amazon EC2 インスタンスのカスタム管理ホストです。論理レプリケーションスロットには、ストリームの受信者に関する情報が含まれていません。また、ターゲットをレプリカデータベースとする必要はありません。論理的なレプリケーションスロットをセットアップし、スロットから読み取りを行わない場合、データが書き込まれて、DB インスタンスのストレージがすぐにいっぱいになる可能性があります。

パラメータ、レプリケーション接続タイプ、およびセキュリティロールを使用して、Amazon RDS の PostgreSQL 論理レプリケーションおよび論理デコードをオンにします。論理デコード用のクライアントは、PostgreSQL DB インスタンスのデータベースにレプリケーション接続を確立できる任意のクライアントとすることができます。

**RDS for PostgreSQL DB インスタンスに対して論理デコードをオンにするには**

1. 使用しているユーザーアカウントに次のロールがあることを確認します。
   + 論理レプリケーションをオンにできるようにする `rds_superuser` ロール 
   + 論理スロットを管理し、論理スロットを使用してデータをストリーミングするためのアクセス許可を付与する `rds_replication` ロール

1. `rds.logical_replication` 静的パラメータを 1 に設定します。このパラメータを適用する一環として、`wal_level`、`max_wal_senders`、`max_replication_slots`、`max_connections` の各パラメータも設定します。これらのパラメータの変更により、生成される WAL が増えることがあるため、論理スロットを使用する場合にのみ、`rds.logical_replication` パラメータを設定してください。

1. 静的 `rds.logical_replication` パラメータの DB インスタンスを再起動して有効にします。

1. 次のセクションの説明に従って論理的なレプリケーションスロットを作成します。このプロセスでは、デコードプラグインを指定する必要があります。現在、RDS for PostgreSQL は、PostgreSQL に付属する出力プラグイン test\$1decoding および wal2json をサポートしています。

PostgreSQL 論理的なデコードの使用の詳細については、[PostgreSQL のドキュメント](https://www.postgresql.org/docs/current/static/logicaldecoding-explanation.html)を参照してください。

## 論理的なレプリケーションスロットの使用
<a name="PostgreSQL.Concepts.General.FeatureSupport.LogicalReplicationSlots"></a>

SQL コマンドを使用して、論理的なスロットを操作できます。例えば、次のコマンドは、デフォルトの PostgreSQL 出力プラグイン `test_slot` を使用して、`test_decoding` という論理的なスロットを作成します。

```
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
slot_name    | xlog_position
-----------------+---------------
regression_slot | 0/16B1970
(1 row)
```

論理的なスロットを一覧表示するには、次のコマンドを使用します。

```
SELECT * FROM pg_replication_slots;
```

論理的なスロットを削除するには、次のコマンドを使用します。

```
SELECT pg_drop_replication_slot('test_slot');
pg_drop_replication_slot
-----------------------
(1 row)
```

論理的なレプリケーションスロットのその他の使用例については、PostgreSQL ドキュメントの「[Logical Decoding Examples](https://www.postgresql.org/docs/9.5/static/logicaldecoding-example.html)」を参照してください。

論理的なレプリケーションスロットを作成すると、ストリーミングをスタートできます。次の例は、ストリーミングレプリケーションプロトコルで論理的なデコードがどのように制御されるかを示しています。この例では、PostgreSQL ディストリビューションに含まれているプログラム pg\$1recvlogical を使用しています。これを行うには、レプリケーション接続を許可するようにクライアント認証が設定されている必要があります。

```
pg_recvlogical -d postgres --slot test_slot -U postgres
    --host -instance-name.111122223333.aws-region.rds.amazonaws.com 
    -f -  --start
```

`pg_replication_origin_status`ビューの内容を表示するには、`pg_show_replication_origin_status` 関数を照会します。

```
SELECT * FROM pg_show_replication_origin_status();
local_id | external_id | remote_lsn | local_lsn
----------+-------------+------------+-----------
(0 rows)
```

## 論理レプリケーションを使用したテーブルレベルのデータのレプリケーション
<a name="PostgreSQL.Concepts.LogicalReplication.Tables"></a>

論理レプリケーションを使用して、RDS for PostgreSQL のソーステーブルからターゲットテーブルにデータをレプリケートできます。論理レプリケーションは、まずソーステーブルから既存のデータの初期ロードを実行し、引き続いて以降の変更をレプリケートします。

1. 

**ソーステーブルを作成する**

   RDS for PostgreSQL DB インスタンスのソースデータベースに接続します。

   ```
   source=> CREATE TABLE testtab (slno int primary key);
   CREATE TABLE
   ```

1. 

**ソーステーブルにデータを挿入する**

   ```
   source=> INSERT INTO testtab VALUES (generate_series(1,1000));
   INSERT 0 1000
   ```

1. 

**ソーステーブルのパブリケーションを作成する**
   + ソーステーブルのパブリケーションを作成します。

     ```
     source=> CREATE PUBLICATION testpub FOR TABLE testtab;
     CREATE PUBLICATION
     ```
   + SELECT クエリを使用して、作成したパブリケーションの詳細を確認します。

     ```
     source=> SELECT * FROM pg_publication;
       oid   | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
     --------+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
      115069 | testpub |    16395 | f            | t         | t         | t         | t           | f
     (1 row)
     ```
   + ソーステーブルがパブリケーションに追加されていることを確認します。

     ```
     source=> SELECT * FROM pg_publication_tables; 
     pubname | schemaname | tablename
     ---------+------------+-----------
      testpub | public     | testtab
     (1 rows)
     ```
   + データベース内のすべてのテーブルをレプリケートするには、以下を使用します。

     ```
     CREATE PUBLICATION testpub FOR ALL TABLES;
     ```
   + パブリケーションが個々のテーブル用に既に作成されており、新しいテーブルを追加する必要がある場合は、以下のクエリを実行して、既存のパブリケーションに新しいテーブルを追加できます。

     ```
     ALTER PUBLICATION <publication_name> add table <new_table_name>;
     ```

1. 

**ターゲットデータベースに接続し、ターゲットテーブルを作成する**
   + ターゲット DB インスタンスのターゲットデータベースに接続します。ソーステーブルと同じ名前でターゲットテーブルを作成します。

     ```
     target=> CREATE TABLE testtab (slno int primary key);
     CREATE TABLE
     ```
   + ターゲットテーブルに対して SELECT クエリを実行し、ターゲットテーブルにデータが存在しないことを確認します。

     ```
         
     target=> SELECT count(*) FROM testtab;
      count
     -------
          0
     (1 row)
     ```

1. 

**ターゲットデータベースでサブスクリプションを作成および検証する**
   + ターゲットデータベースにサブスクリプションを作成します。

     ```
     target=> CREATE SUBSCRIPTION testsub 
     CONNECTION 'host=<source RDS/host endpoint> port=5432 dbname=<source_db_name> user=<user> password=<password>' 
     PUBLICATION testpub;
     NOTICE:  Created replication slot "testsub" on publisher
     CREATE SUBSCRIPTION
     ```
   + SELECT クエリを使用して、サブスクリプションが有効になっていることを確認します。

     ```
     target=> SELECT oid, subname, subenabled, subslotname, subpublications FROM pg_subscription;
       oid  | subname | subenabled | subslotname | subpublications
     -------+---------+------------+-------------+-----------------
      16434 | testsub | t          | testsub     | {testpub}
     (1 row)
     ```
   + サブスクリプションが作成されると、ソーステーブルからターゲットテーブルにすべてのデータがロードされます。ターゲットテーブルに対して SELECT クエリを実行し、初期データがロードされていることを確認します。

     ```
     target=> SELECT count(*) FROM testtab;
      count
     -------
       1000
     (1 row)
     ```

1. 

**ソースデータベースのレプリケーションスロットを確認する**

   ターゲットデータベースにサブスクリプションを作成すると、ソースデータベースにレプリケーションスロットが作成されます。ソースデータベースに対して次の SELECT クエリを実行して、レプリケーションスロットの詳細を確認します。

   ```
   source=> SELECT * FROM pg_replication_slots;
    
   slot_name |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
   ----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
   testsub   | pgoutput | logical   | 115048 | source   | f         | t      |        846 |      |         6945 | 58/B4000568 | 58/B40005A0         | reserved   |
   (1 row)
   ```

1. 

**レプリケーションのテスト**
   + ソーステーブルに行を挿入して、ソーステーブルのデータ変更がターゲットテーブルにレプリケートされているかどうかをテストします。

     ```
     source=> INSERT INTO testtab VALUES(generate_series(1001,2000));
     INSERT 0 1000
     
     source=> SELECT count(*) FROM testtab; 
      count
     -------
       2000
     (1 row)
     ```
   + ターゲットテーブル内の行数を確認して、新しい挿入がレプリケートされていることを確認します。

     ```
     target=> SELECT count(*) FROM testtab;
      count
     -------
       2000
     (1 row)
     ```

1. 

**テーブルを追加した後のサブスクリプションの更新**
   + 既存のパブリケーションに新しいテーブルを追加する場合、変更を有効にするにはサブスクリプションを更新する必要があります。

     ```
     ALTER SUBSCRIPTION <subscription_name> REFRESH PUBLICATION;
     ```
   + このコマンドは、パブリッシャーから欠落しているテーブル情報を取得し、サブスクリプションの作成または最終更新以降にサブスクライブ先のパブリケーションに追加されたテーブルのレプリケーションを開始します。