你可以在这里注册加入私有预览候补名单。
前置条件
创建你的第一个 ClickPipe
- 进入你的 ClickHouse Cloud 服务的 SQL 控制台。
- 在左侧菜单中选择
Data Sources按钮,然后点击“Set up a ClickPipe”
- 选择 GCP Pub/Sub 作为数据源。
- 填写表单,为 ClickPipe 提供名称、GCP Project ID,以及已被授予 Pub/Sub 访问权限的服务账号对应的服务账号 JSON 文件。Project ID 必须为 6–30 个字符,只能包含小写字母、数字和连字符,且必须以字母开头,不能以连字符结尾。
-
选择要从中摄取数据的 Pub/Sub topic。凭据验证通过后,下拉菜单会自动填充你在 GCP 项目中的 topics (按字母顺序排序) 。
- 数据格式。 选择某个 topic 后,ClickPipes 会查询 Pub/Sub 的 Schema Registry。如果该 topic 附带原生 Avro 或 Protobuf schema,则会自动检测数据格式和 schema,并将选择器锁定到该 topic 上的最新 schema。没有原生 schema 的 topic 默认使用 JSONEachRow。
- 起始偏移量。 选择从哪里开始消费。可用选项包括 Latest (仅新消息) 、Earliest (最早保留的消息) 和 Seek to Timestamp (带 UTC 日期时间选择器) 。
- 过滤表达式 (可选) 。 对消息属性应用 Pub/Sub 订阅过滤器,例如
attributes.type = "telemetry"。过滤器仅适用于消息属性,不适用于载荷,并且在管道创建后无法更改 (如需更改过滤器,必须重新创建管道) 。 - UI 会显示所选 topic 中的一条样本消息,并提供一个 展平对象 开关,让你预览嵌套 JSON 在目标端会如何被展平。
- 在下一步中,你可以选择将数据摄取到新的 ClickHouse 表中,或复用现有表。按照界面上的说明修改表名、schema 和设置。你可以在顶部的样本表中实时预览这些更改。
- 或者,你也可以选择将数据摄取到现有的 ClickHouse 表中。在这种情况下,UI 会允许你将源字段映射到所选目标表中的 ClickHouse 字段。
- 最后,你可以为内部 ClickPipes 用户配置权限。
Full access:拥有对集群的完全访问权限。如果你在目标表中使用了 materialized view 或 字典,这可能会很有用。Only destination table:仅拥有对目标表的INSERT权限。
- 点击“Complete Setup”后,系统会注册你的 ClickPipe,你将能够在汇总表中看到它。
- 恭喜! 您已成功设置第一个 Pub/Sub ClickPipe。它将持续运行,实时将来自您的 Pub/Sub topic 的数据摄取到您的 ClickHouse Cloud 服务中。
托管订阅
- 托管订阅名为
clickpipes-{pipeID},会在管道启动时于该 topic 上创建。 - 其配置包括 60 秒 ack deadline、7 天消息保留期,以及启用消息排序。
- 订阅创建是幂等的——如果已有订阅指向所配置的 topic,则管道重启和副本重新调度时都会复用该订阅。
- 在 topic 发现和消息采样期间,ClickPipes 还会创建短暂的临时订阅 (
clickpipes-discovery-{uuid}) ,并在采样完成后立即删除。 - 删除管道时,ClickPipes 也会在拆除过程中删除该托管订阅。
支持的数据格式
压缩
- gzip
- zstd
- lz4
- snappy (帧格式)
对于 JSON 这类基于文本的格式,自动检测是安全的,因为可打印的 ASCII 字符不会与压缩 magic bytes 冲突。解压后的载荷上限为 64MB。
受支持的数据类型
标准类型支持
- 基础数值类型 - [U]Int8/16/32/64、Float32/64 和 BFloat16
- 大整数类型 - [U]Int128/256
- Decimal 类型
- 布尔类型
- String
- FixedString
- Date、Date32
- DateTime、DateTime64 (仅支持 UTC 时区)
- Enum8/Enum16
- UUID
- IPv4
- IPv6
- 所有 ClickHouse LowCardinality 类型
- Map,其键和值可使用上述任意类型 (包括 Nullable)
- Tuple 和 Array,其元素可使用上述任意类型 (包括 Nullable,仅支持一层深度)
- SimpleAggregateFunction 类型 (适用于 AggregatingMergeTree 或 SummingMergeTree 目标端)
Variant 类型支持
Variant(String, Int64, DateTime)) 。
由于 ClickPipes 判定应使用哪种 Variant 子类型的方式,在 Variant 定义中只能使用一种整数类型或 datetime
类型——例如,不支持 Variant(Int64, UInt32)。
JSON 类型支持
Pub/Sub 虚拟列
Add Column 按钮添加虚拟列。
| Name | Description | Recommended Data Type |
|---|---|---|
| _message_id | 由代理分配的 Pub/Sub 消息 ID | String |
| _publish_time | Pub/Sub 发布时间戳 (毫秒精度,UTC) | DateTime64(3) |
| _ordering_key | Pub/Sub 排序键 (消息未设置 key 时为空字符串) | String |
| _attributes | 用户自定义的 Pub/Sub 消息属性 | Map(String, String) |
| _raw_message | 完整的 Pub/Sub 消息载荷 (默认禁用) | String |
_raw_message 字段 (例如使用 ClickHouse JsonExtract* 函数来填充下游 materialized view) 。对于这类管道,删除所有“非虚拟”列可能有助于提升 ClickPipes 性能。
限制
- 不支持 DEFAULT。
- 使用最小 (XS) 副本规格运行时,单条消息默认最大为 8MB (未压缩) ;使用更大规格的副本时,默认最大为 16MB (未压缩) 。超出此限制的消息将因报错而被拒绝。如果您需要支持更大的消息,请联系支持团队。
- Pub/Sub 订阅过滤器不可变——更改过滤表达式需要重新创建管道。
- 过滤器仅适用于消息属性,不适用于消息载荷。
性能
批处理
- 批次大小达到上限 (每 1GB 副本内存为 100,000 行或 32MB)
- 批次保持打开状态达到最长时限 (5 秒)
延迟
排序键
扩缩容
投递语义
_message_id 虚拟列进行去重 (每个 Pub/Sub 消息 ID 在一个 topic 内都是唯一的) 。