跳转到主要内容
建议 ClickHouse Cloud 用户使用 ClickPipes 将 PostgreSQL 复制到 ClickHouse。该方案原生支持 PostgreSQL 的高性能 CDC (变更数据捕获) 。
创建一个 ClickHouse 数据库,其中包含来自 PostgreSQL 数据库的表。首先,使用 MaterializedPostgreSQL 引擎的数据库会为 PostgreSQL 数据库创建快照,并加载所需的表。所需表可以是指定数据库中任意 schema 子集下的任意表子集。创建快照的同时,数据库引擎还会获取 LSN;在完成表的初始转储后,便开始从 WAL 拉取更新。数据库创建完成后,之后新增到 PostgreSQL 数据库中的表不会自动加入复制,必须使用 ATTACH TABLE db.table 查询手动添加。 复制基于 PostgreSQL Logical Replication Protocol 实现。该协议不支持复制 DDL,但能够识别是否发生了会破坏复制的变更 (如列类型变更、添加/删除列) 。检测到此类变更后,对应表将停止接收更新。此时,应使用 ATTACH/ DETACH PERMANENTLY 查询重新完整加载该表。如果 DDL 不会破坏复制 (例如重命名列) ,表仍会继续接收更新 (insert 按位置执行) 。
该数据库引擎属于 Experimental。要使用它,请在配置文件中将 allow_experimental_database_materialized_postgresql 设为 1,或使用 SET 命令:
SET allow_experimental_database_materialized_postgresql=1

创建数据库

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializedPostgreSQL('host:port', 'database', 'user', 'password') [SETTINGS ...]
引擎参数
  • host:port — PostgreSQL 服务器的端点。
  • database — PostgreSQL 数据库名。
  • user — PostgreSQL 用户。
  • password — 用户密码。

使用示例

CREATE DATABASE postgres_db
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password');

SHOW TABLES FROM postgres_db;

┌─name───┐
│ table1 │
└────────┘

SELECT * FROM postgresql_db.postgres_table;

动态将新表添加到复制范围中

创建 MaterializedPostgreSQL 数据库后,它不会自动检测对应 PostgreSQL 数据库中的新表。此类表可以手动添加:
ATTACH TABLE postgres_database.new_table;
在 22.1 之前的版本中,将表加入复制时会遗留一个未删除的临时 replication slot (名为 {db_name}_ch_replication_slot_tmp) 。如果在 22.1 之前的 ClickHouse 版本中 Attach 表,请务必手动删除该 slot (SELECT pg_drop_replication_slot('{db_name}_ch_replication_slot_tmp')) 。否则,磁盘占用会持续增长。此问题已在 22.1 中修复。

动态将表从复制中移除

可以将特定表从复制中移除:
DETACH TABLE postgres_database.table_to_remove PERMANENTLY;

PostgreSQL schema

PostgreSQL schema 可通过 3 种方式进行配置 (自 21.12 版本起) 。
  1. 一个 MaterializedPostgreSQL 数据库引擎对应一个 schema。需要使用设置 materialized_postgresql_schema。 表 只通过表名访问:
CREATE DATABASE postgres_database
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_schema = 'postgres_schema';

SELECT * FROM postgres_database.table1;
  1. 对于一个 MaterializedPostgreSQL 数据库引擎,可以指定任意数量的 schema 及其对应的一组表。需要使用设置 materialized_postgresql_tables_list。每个表都必须连同其所属 schema 一起写出。 访问表时需要同时使用 schema 名称和表名:
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_tables_list = 'schema1.table1,schema2.table2,schema1.table3',
         materialized_postgresql_tables_list_with_schema = 1;

SELECT * FROM database1.`schema1.table1`;
SELECT * FROM database1.`schema2.table2`;
但在这种情况下,materialized_postgresql_tables_list 中的所有表都必须写明其 schema 名称。 需要设置 materialized_postgresql_tables_list_with_schema = 1 警告:在这种情况下,表名中不允许包含点号。
  1. 对于单个 MaterializedPostgreSQL 数据库引擎,可以指定任意数量的 schema 及其完整的表集。需要使用设置 materialized_postgresql_schema_list
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_schema_list = 'schema1,schema2,schema3';

SELECT * FROM database1.`schema1.table1`;
SELECT * FROM database1.`schema1.table2`;
SELECT * FROM database1.`schema2.table2`;
警告:在这种情况下,表名中不能包含点号。

要求

  1. 在 PostgreSQL 配置文件中,wal_level 设置的值必须为 logical,并且 max_replication_slots 参数的值必须至少为 2
  2. 每个被复制的表都必须具有以下 副本标识 之一:
  • 主键 (默认)
  • 索引
postgres# CREATE TABLE postgres_table (a Integer NOT NULL, b Integer, c Integer NOT NULL, d Integer, e Integer NOT NULL);
postgres# CREATE unique INDEX postgres_table_index on postgres_table(a, c, e);
postgres# ALTER TABLE postgres_table REPLICA IDENTITY USING INDEX postgres_table_index;
始终会先检查主键。如果主键不存在,则会检查被定义为副本标识索引的索引。 如果某个索引被用作副本标识,则一张表中只能有一个这样的索引。 你可以使用以下命令查看特定表使用的是哪种类型:
postgres# SELECT CASE relreplident
          WHEN 'd' THEN 'default'
          WHEN 'n' THEN 'nothing'
          WHEN 'f' THEN 'full'
          WHEN 'i' THEN 'index'
       END AS replica_identity
FROM pg_class
WHERE oid = 'postgres_table'::regclass;
不支持 TOAST 值的复制。将使用该数据类型的默认值。

设置

materialized_postgresql_tables_list

设置一个以逗号分隔的 PostgreSQL 数据库表列表,这些表将通过 MaterializedPostgreSQL 数据库引擎进行复制。 每个表都可以在括号中指定要复制的列子集。如果省略该列子集,则复制该表的所有列。
    materialized_postgresql_tables_list = 'table1(co1, col2),table2,table3(co3, col5, col7)
默认值:空列表——表示将复制整个 PostgreSQL 数据库。

materialized_postgresql_schema

默认值:空字符串。 (使用默认 schema)

materialized_postgresql_schema_list

默认值:空列表。 (将使用默认 schema)

materialized_postgresql_max_block_size

设置在将数据刷新到 PostgreSQL 数据库表之前,可在内存中收集的行数。 可能的值:
  • 正整数。
默认值:65536

materialized_postgresql_replication_slot

用户创建的 replication slot。必须与 materialized_postgresql_snapshot 搭配使用。

materialized_postgresql_snapshot

用于标识快照的文本字符串,PostgreSQL 表的初始转储 将基于该快照执行。必须与 materialized_postgresql_replication_slot 一起使用。
    CREATE DATABASE database1
    ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
    SETTINGS materialized_postgresql_tables_list = 'table1,table2,table3';

    SELECT * FROM database1.table1;
如有需要,可以使用 DDL 查询来更改这些设置。但设置 materialized_postgresql_tables_list 无法更改。要更新该设置中的表列表,请使用 ATTACH TABLE 查询。
    ALTER DATABASE postgres_database MODIFY SETTING materialized_postgresql_max_block_size = <new_size>;

materialized_postgresql_use_unique_replication_consumer_identifier

在复制中使用唯一的复制消费者标识符。默认值:0。 如果设置为 1,则允许将多个 MaterializedPostgreSQL 表配置为指向同一个 PostgreSQL 表。

注意事项

logical replication slot 的故障转移

存在于主节点上的 logical replication slot 在 standby 副本上不可用。 因此,如果发生故障转移,新的主节点 (原来的物理 standby) 将不知道旧主节点上存在哪些 slot。这会导致 PostgreSQL 复制中断。 解决办法之一是自行管理 replication slot,并定义一个永久 replication slot (可在这里找到一些相关信息) 。你需要通过 materialized_postgresql_replication_slot 设置传入 slot 名称,并且该 slot 必须使用 EXPORT SNAPSHOT 选项导出。快照 标识符则需要通过 materialized_postgresql_snapshot 设置传入。 请注意,只有在确实需要时才应使用这种方式。如果并无实际需求,或者不完全清楚原因,最好让表引擎自行创建和管理 replication slot。 示例 (来自 @bchrobot)
  1. 在 PostgreSQL 中配置 replication slot。
    apiVersion: "acid.zalan.do/v1"
    kind: postgresql
    metadata:
      name: acid-demo-cluster
    spec:
      numberOfInstances: 2
      postgresql:
        parameters:
          wal_level: logical
      patroni:
        slots:
          clickhouse_sync:
            type: logical
            database: demodb
            plugin: pgoutput
    
  2. 等待 replication slot 就绪,然后开始一个事务并导出该事务的 快照 标识符:
    BEGIN;
    SELECT pg_export_snapshot();
    
  3. 在 ClickHouse 中创建数据库:
    CREATE DATABASE demodb
    ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
    SETTINGS
      materialized_postgresql_replication_slot = 'clickhouse_sync',
      materialized_postgresql_snapshot = '0000000A-0000023F-3',
      materialized_postgresql_tables_list = 'table1,table2,table3';
    
  4. 确认已复制到 ClickHouse DB 后,结束 PostgreSQL 事务。验证在故障转移后复制是否仍会继续:
    kubectl exec acid-demo-cluster-0 -c postgres -- su postgres -c 'patronictl failover --candidate acid-demo-cluster-1 --force'
    

所需权限

  1. CREATE PUBLICATION — CREATE 查询权限。
  2. CREATE_REPLICATION_SLOT — 复制权限。
  3. pg_drop_replication_slot — 复制权限或超级用户权限。
  4. DROP PUBLICATION — publication 的所有者 (即 MaterializedPostgreSQL engine 自身中的 username) 。
也可以不执行 23 命令,从而无需具备这些权限。使用设置 materialized_postgresql_replication_slotmaterialized_postgresql_snapshot 即可。但务必格外谨慎。 对以下表的访问权限:
  1. pg_publication
  2. pg_replication_slots
  3. pg_publication_tables
最后修改于 2026年6月10日