

# 为 Amazon RDS for PostgreSQL 执行逻辑复制
<a name="PostgreSQL.Concepts.General.FeatureSupport.LogicalReplication"></a>

从版本 10.4 开始，RDS for PostgreSQL 支持 PostgreSQL 10 中引入的发布和订阅 SQL 语法。要了解更多信息，请参阅 PostgreSQL 文档中的[逻辑复制](https://www.postgresql.org/docs/current/logical-replication.html)。

**注意**  
除了 PostgreSQL 10 中引入的原生 PostgreSQL 逻辑复制功能外，RDS for PostgreSQL 还支持 `pglogical` 扩展。有关更多信息，请参阅 [使用 pglogical 跨实例同步数据](Appendix.PostgreSQL.CommonDBATasks.pglogical.md)。

在下文中，您可以了解有关为 RDS for PostgreSQL 数据库实例设置逻辑复制的信息。

**Topics**
+ [了解逻辑复制和逻辑解码](#PostgreSQL.Concepts.General.FeatureSupport.LogicalDecoding)
+ [使用逻辑复制槽](#PostgreSQL.Concepts.General.FeatureSupport.LogicalReplicationSlots)
+ [使用逻辑复制来复制表级数据](#PostgreSQL.Concepts.LogicalReplication.Tables)

## 了解逻辑复制和逻辑解码
<a name="PostgreSQL.Concepts.General.FeatureSupport.LogicalDecoding"></a>

RDS for PostgreSQL 支持使用 PostgreSQL 的逻辑复制槽流式传输预写日志 (WAL) 更改。其还支持使用逻辑解码。您可以在实例上设置逻辑复制槽并通过这些槽将数据库更改流式传输到某个客户端（如 `pg_recvlogical`）。您可在数据库级别创建逻辑复制槽，它们支持与单个数据库的复制连接。

PostgreSQL 逻辑复制的最常见客户端是 AWS Database Migration Service 或 Amazon EC2 实例上的自定义托管主机。逻辑复制槽没有关于流接收器的信息。此外，不要求目标是副本数据库。如果设置逻辑复制槽并且不从该槽进行读取，则数据可写入并快速填满数据库实例上的存储。

您可启用 Amazon RDS 的 PostgreSQL 逻辑复制和逻辑解码，带有参数、复制连接类型和安全角色。逻辑解码的客户端可以是能够与 PostgreSQL 数据库实例上的数据库建立复制连接的任何客户端。

**为 RDS for PostgreSQL 数据库实例启用逻辑解码**

1. 确保您使用的用户账户具有以下角色：
   + `rds_superuser` 角色，以使您可以启用逻辑复制 
   + `rds_replication` 角色，以授予管理逻辑槽并使用逻辑槽流式处理数据的权限

1. 将 `rds.logical_replication` 静态参数设置为 1。在应用该参数时，还将设置参数 `wal_level`、`max_wal_senders`、`max_replication_slots` 和 `max_connections`。这些参数更改可能会增加 WAL 生成，因此，仅在使用逻辑槽时设置 `rds.logical_replication` 参数。

1. 重启数据库实例，静态 `rds.logical_replication` 参数才会生效。

1. 请按下一部分中的说明创建逻辑复制槽。该过程需要您指定解码插件。目前，RDS for PostgreSQL 支持 PostgreSQL 随附的 test\$1decoding 和 wal2json 输出插件。

有关 PostgreSQL 逻辑解码的更多信息，请参阅 [PostgreSQL 文档](https://www.postgresql.org/docs/current/static/logicaldecoding-explanation.html)。

## 使用逻辑复制槽
<a name="PostgreSQL.Concepts.General.FeatureSupport.LogicalReplicationSlots"></a>

您可以通过 SQL 命令来使用逻辑槽。例如，以下命令使用默认的 PostgreSQL 输出插件 `test_slot` 创建一个名为 `test_decoding` 的逻辑槽。

```
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
slot_name    | xlog_position
-----------------+---------------
regression_slot | 0/16B1970
(1 row)
```

要列出逻辑槽，请使用以下命令。

```
SELECT * FROM pg_replication_slots;
```

要删除逻辑槽，请使用以下命令。

```
SELECT pg_drop_replication_slot('test_slot');
pg_drop_replication_slot
-----------------------
(1 row)
```

有关使用逻辑复制槽的更多示例，请参阅 PostgreSQL 文档中的[逻辑解码示例](https://www.postgresql.org/docs/9.5/static/logicaldecoding-example.html)。

创建逻辑复制槽后，就可以开始流式处理。以下示例显示了如何通过流式复制协议控制逻辑解码。此示例使用 PostgreSQL 发行版中包含的程序 pg\$1recvlogical。此操作要求设置客户端身份验证以允许复制连接。

```
pg_recvlogical -d postgres --slot test_slot -U postgres
    --host -instance-name.111122223333.aws-region.rds.amazonaws.com 
    -f -  --start
```

要查看 `pg_replication_origin_status` 视图的内容，请查询 `pg_show_replication_origin_status` 函数。

```
SELECT * FROM pg_show_replication_origin_status();
local_id | external_id | remote_lsn | local_lsn
----------+-------------+------------+-----------
(0 rows)
```

## 使用逻辑复制来复制表级数据
<a name="PostgreSQL.Concepts.LogicalReplication.Tables"></a>

在 RDS for PostgreSQL 中，可以使用逻辑复制将数据从源表复制到目标表。逻辑复制首先对源表中的现有数据执行初始加载，然后继续复制持续发生的更改。

1. 

**创建源表**

   连接到 RDS for PostgreSQL 数据库实例中的源数据库。

   ```
   source=> CREATE TABLE testtab (slno int primary key);
   CREATE TABLE
   ```

1. 

**将数据插入到源表中**

   ```
   source=> INSERT INTO testtab VALUES (generate_series(1,1000));
   INSERT 0 1000
   ```

1. 

**为源表创建发布**
   + 为源表创建发布：

     ```
     source=> CREATE PUBLICATION testpub FOR TABLE testtab;
     CREATE PUBLICATION
     ```
   + 使用 SELECT 查询来验证已创建的发布的详细信息：

     ```
     source=> SELECT * FROM pg_publication;
       oid   | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
     --------+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
      115069 | testpub |    16395 | f            | t         | t         | t         | t           | f
     (1 row)
     ```
   + 验证源表已添加到发布中：

     ```
     source=> SELECT * FROM pg_publication_tables; 
     pubname | schemaname | tablename
     ---------+------------+-----------
      testpub | public     | testtab
     (1 rows)
     ```
   + 要复制数据库中的所有表，请使用：

     ```
     CREATE PUBLICATION testpub FOR ALL TABLES;
     ```
   + 如果已经为单个表创建了发布，并且您需要添加新表，则可以运行下面的查询来将任何新表添加到现有发布中：

     ```
     ALTER PUBLICATION <publication_name> add table <new_table_name>;
     ```

1. 

**连接到目标数据库并创建目标表**
   + 连接到目标数据库实例中的目标数据库。创建与源表同名的目标表：

     ```
     target=> CREATE TABLE testtab (slno int primary key);
     CREATE TABLE
     ```
   + 通过对目标表运行 SELECT 查询，确保目标表中不存在数据：

     ```
         
     target=> SELECT count(*) FROM testtab;
      count
     -------
          0
     (1 row)
     ```

1. 

**在目标数据库中创建并验证订阅**
   + 在目标数据库中创建订阅：

     ```
     target=> CREATE SUBSCRIPTION testsub 
     CONNECTION 'host=<source RDS/host endpoint> port=5432 dbname=<source_db_name> user=<user> password=<password>' 
     PUBLICATION testpub;
     NOTICE:  Created replication slot "testsub" on publisher
     CREATE SUBSCRIPTION
     ```
   + 使用 SELECT 查询来验证是否启用了订阅：

     ```
     target=> SELECT oid, subname, subenabled, subslotname, subpublications FROM pg_subscription;
       oid  | subname | subenabled | subslotname | subpublications
     -------+---------+------------+-------------+-----------------
      16434 | testsub | t          | testsub     | {testpub}
     (1 row)
     ```
   + 创建订阅后，它会将源表中的所有数据加载到目标表。对目标表运行 SELECT 查询以验证初始数据是否已加载：

     ```
     target=> SELECT count(*) FROM testtab;
      count
     -------
       1000
     (1 row)
     ```

1. 

**验证源数据库中的复制槽**

   在目标数据库中创建订阅会在源数据库中创建复制槽。通过对源数据库运行以下 SELECT 查询来验证复制槽详细信息：

   ```
   source=> SELECT * FROM pg_replication_slots;
    
   slot_name |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
   ----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
   testsub   | pgoutput | logical   | 115048 | source   | f         | t      |        846 |      |         6945 | 58/B4000568 | 58/B40005A0         | reserved   |
   (1 row)
   ```

1. 

**测试复制**
   + 通过在源表中插入行，测试源表中的数据更改是否复制到目标表：

     ```
     source=> INSERT INTO testtab VALUES(generate_series(1001,2000));
     INSERT 0 1000
     
     source=> SELECT count(*) FROM testtab; 
      count
     -------
       2000
     (1 row)
     ```
   + 验证目标表中的行数，以确认正在复制新的插入内容：

     ```
     target=> SELECT count(*) FROM testtab;
      count
     -------
       2000
     (1 row)
     ```

1. 

**添加表后刷新订阅**
   + 在向现有发布添加新表时，务必刷新订阅才能使更改生效：

     ```
     ALTER SUBSCRIPTION <subscription_name> REFRESH PUBLICATION;
     ```
   + 此命令从发布者那里提取缺失的表信息，并开始复制自创建或上次刷新订阅以来添加到已订阅发布中的表。