PostgreSQL 引擎允许对存储在远程 PostgreSQL 服务器上的数据执行 SELECT 和 INSERT 查询。
目前,该表引擎仅支持 PostgreSQL 12 及以上版本。
了解我们的 Managed Postgres 服务。它采用与计算资源物理同址的 NVMe 存储,相比使用 EBS 等网络附加存储的替代方案,对于磁盘受限型工作负载,性能最高可提升 10 倍,并且可让你通过 ClickPipes 中的 Postgres CDC 连接器将 Postgres 数据复制到 ClickHouse。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 type1 [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 type2 [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = PostgreSQL({host:port, database, table, user, password[, schema, [, on_conflict]] | named_collection[, option=value [,..]]})
请参阅 CREATE TABLE 查询的详细说明。
该表的结构可以与原始 PostgreSQL 表结构不同:
- 列名应与原始 PostgreSQL 表中的列名一致,但你也可以只使用其中部分列,且顺序不限。
- 列类型可以与原始 PostgreSQL 表中的不同。ClickHouse 会尝试将值转换为 ClickHouse 数据类型。
- external_table_functions_use_nulls 设置定义了如何处理 Nullable 列。默认值:1。如果为 0,表函数不会创建 Nullable 列,而会插入默认值来代替 null。此规则也适用于数组中的 NULL 值。
引擎参数
host:port — PostgreSQL 服务器地址。
database — 远程数据库名称。
table — 远程表名。
user — PostgreSQL 用户。
password — 用户密码。
schema — 非默认表 schema。可选。
on_conflict — 冲突解决策略。示例:ON CONFLICT DO NOTHING。可选。注意:添加此选项会降低插入效率。
建议在生产环境中使用命名集合 (自 21.11 版本起可用) 。以下是一个示例:
<named_collections>
<postgres_creds>
<host>localhost</host>
<port>5432</port>
<user>postgres</user>
<password>****</password>
<schema>schema1</schema>
</postgres_creds>
</named_collections>
某些参数可以通过键值参数覆盖:
SELECT * FROM postgresql(postgres_creds, table='table1');
PostgreSQL 端的 SELECT 查询会在只读的 PostgreSQL 事务中以 COPY (SELECT ...) TO STDOUT 的形式运行,并在每次 SELECT 查询后提交。
简单的 WHERE 子句,如 =, !=, >, >=, <, <= 和 IN,会在 PostgreSQL 服务器上执行。
所有 JOIN、聚合、排序、IN [ array ] 条件以及 LIMIT 采样约束,都只会在对 PostgreSQL 的查询结束后于 ClickHouse 中执行。
PostgreSQL 端的 INSERT 查询会在 PostgreSQL 事务中以 COPY "table_name" (field1, field2, ... fieldN) FROM STDIN 的形式运行,并在每条 INSERT 语句后自动提交。
PostgreSQL 的 Array 类型会转换为 ClickHouse 数组。
请注意:在 PostgreSQL 中,以 type_name[] 形式创建的数组数据,可能会在同一列的不同行中包含维度数量不同的多维数组。但在 ClickHouse 中,同一列的所有表行只允许使用维度数量相同的多维数组。
支持多个副本,必须使用 | 列出。例如:
CREATE TABLE test_replicas (id UInt32, name String) ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword');
支持为 PostgreSQL 字典源配置副本优先级。映射中的数值越大,优先级越低。最高优先级为 0。
在下面的示例中,副本 example01-1 的优先级最高:
<postgresql>
<port>5432</port>
<user>clickhouse</user>
<password>qwerty</password>
<replica>
<host>example01-1</host>
<priority>1</priority>
</replica>
<replica>
<host>example01-2</host>
<priority>2</priority>
</replica>
<db>db_name</db>
<table>table_name</table>
<where>id=10</where>
<invalidate_query>SQL_QUERY</invalidate_query>
</postgresql>
</source>
postgres=# CREATE TABLE "public"."test" (
"int_id" SERIAL,
"int_nullable" INT NULL DEFAULT NULL,
"float" FLOAT NOT NULL,
"str" VARCHAR(100) NOT NULL DEFAULT '',
"float_nullable" FLOAT NULL DEFAULT NULL,
PRIMARY KEY (int_id));
CREATE TABLE
postgres=# INSERT INTO test (int_id, str, "float") VALUES (1,'test',2);
INSERT 0 1
postgresql> SELECT * FROM test;
int_id | int_nullable | float | str | float_nullable
--------+--------------+-------+------+----------------
1 | | 2 | test |
(1 row)
在 ClickHouse 中创建表,并连接到上面创建的 PostgreSQL 表
本示例使用 PostgreSQL 表引擎 将 ClickHouse 表连接到 PostgreSQL 表,并通过 PostgreSQL 数据库执行 SELECT 和 INSERT 语句:
CREATE TABLE default.postgresql_table
(
`float_nullable` Nullable(Float32),
`str` String,
`int_id` Int32
)
ENGINE = PostgreSQL('localhost:5432', 'public', 'test', 'postgres_user', 'postgres_password');
使用 SELECT 查询将 PostgreSQL 表中的初始数据插入 ClickHouse 表
postgresql 表函数可将数据从 PostgreSQL 复制到 ClickHouse,通常用于把查询或分析工作放在 ClickHouse 中执行而不是在 PostgreSQL 中执行,从而提升数据的查询性能;也可用于将数据从 PostgreSQL 迁移到 ClickHouse。由于我们要将数据从 PostgreSQL 复制到 ClickHouse,因此会在 ClickHouse 中使用一个 MergeTree 表引擎,并将其命名为 postgresql_copy:
CREATE TABLE default.postgresql_copy
(
`float_nullable` Nullable(Float32),
`str` String,
`int_id` Int32
)
ENGINE = MergeTree
ORDER BY (int_id);
INSERT INTO default.postgresql_copy
SELECT * FROM postgresql('localhost:5432', 'public', 'test', 'postgres_user', 'postgres_password');
将 PostgreSQL 表中的增量数据插入到 ClickHouse 表
如果要在初次插入后,继续在 PostgreSQL 表与 ClickHouse 表之间进行持续同步,可以在 ClickHouse 中使用 WHERE 子句,只插入基于时间戳或唯一序列 ID 在 PostgreSQL 中新增的数据。
这需要跟踪此前已插入的最大 ID 或时间戳,例如:
SELECT max(`int_id`) AS maxIntID FROM default.postgresql_copy;
然后插入 PostgreSQL 表中大于该最大值的数据
INSERT INTO default.postgresql_copy
SELECT * FROM postgresql('localhost:5432', 'public', 'test', 'postges_user', 'postgres_password');
WHERE int_id > maxIntID;
SELECT * FROM postgresql_copy WHERE str IN ('test');
┌─float_nullable─┬─str──┬─int_id─┐
│ ᴺᵁᴸᴸ │ test │ 1 │
└────────────────┴──────┴────────┘
postgres=# CREATE SCHEMA "nice.schema";
postgres=# CREATE TABLE "nice.schema"."nice.table" (a integer);
postgres=# INSERT INTO "nice.schema"."nice.table" SELECT i FROM generate_series(0, 99) as t(i)
CREATE TABLE pg_table_schema_with_dots (a UInt32)
ENGINE PostgreSQL('localhost:5432', 'clickhouse', 'nice.table', 'postgrsql_user', 'password', 'nice.schema');
另请参阅