跳转到主要内容
你可以在这里注册加入私有预览候补名单。
Pub/Sub ClickPipes 可以通过 ClickPipes UI 手动部署和管理,也可以使用 OpenAPITerraform 以编程方式部署和管理。

前置条件

你应已熟悉 ClickPipes 简介,并可访问包含要摄取的 topic 的 GCP 项目,同时已创建具备相应 Pub/Sub 权限的服务账号。有关 ClickPipes 所需的具体权限,请参阅 Pub/Sub IAM 权限指南

创建你的第一个 ClickPipe

  1. 进入你的 ClickHouse Cloud 服务的 SQL 控制台。
  1. 在左侧菜单中选择 Data Sources 按钮,然后点击“Set up a ClickPipe”
  1. 选择 GCP Pub/Sub 作为数据源。
  1. 填写表单,为 ClickPipe 提供名称、GCP Project ID,以及已被授予 Pub/Sub 访问权限的服务账号对应的服务账号 JSON 文件。Project ID 必须为 6–30 个字符,只能包含小写字母、数字和连字符,且必须以字母开头,不能以连字符结尾。
  1. 选择要从中摄取数据的 Pub/Sub topic。凭据验证通过后,下拉菜单会自动填充你在 GCP 项目中的 topics (按字母顺序排序) 。
    • 数据格式。 选择某个 topic 后,ClickPipes 会查询 Pub/Sub 的 Schema Registry。如果该 topic 附带原生 Avro 或 Protobuf schema,则会自动检测数据格式和 schema,并将选择器锁定到该 topic 上的最新 schema。没有原生 schema 的 topic 默认使用 JSONEachRow。
    • 起始偏移量。 选择从哪里开始消费。可用选项包括 Latest (仅新消息) 、Earliest (最早保留的消息) 和 Seek to Timestamp (带 UTC 日期时间选择器) 。
    • 过滤表达式 (可选) 。 对消息属性应用 Pub/Sub 订阅过滤器,例如 attributes.type = "telemetry"。过滤器仅适用于消息属性,不适用于载荷,并且在管道创建后无法更改 (如需更改过滤器,必须重新创建管道) 。
    • UI 会显示所选 topic 中的一条样本消息,并提供一个 展平对象 开关,让你预览嵌套 JSON 在目标端会如何被展平。
  1. 在下一步中,你可以选择将数据摄取到新的 ClickHouse 表中,或复用现有表。按照界面上的说明修改表名、schema 和设置。你可以在顶部的样本表中实时预览这些更改。
你还可以使用提供的控件自定义高级设置
  1. 或者,你也可以选择将数据摄取到现有的 ClickHouse 表中。在这种情况下,UI 会允许你将源字段映射到所选目标表中的 ClickHouse 字段。
  1. 最后,你可以为内部 ClickPipes 用户配置权限。
权限: ClickPipes 会创建一个专用用户,用于向目标表写入数据。你可以为该内部用户选择角色,既可以使用自定义角色,也可以使用预定义角色之一:
  • Full access:拥有对集群的完全访问权限。如果你在目标表中使用了 materialized view 或 字典,这可能会很有用。
    • Only destination table:仅拥有对目标表的 INSERT 权限。
  1. 点击“Complete Setup”后,系统会注册你的 ClickPipe,你将能够在汇总表中看到它。
摘要表中提供了相关控件,用于显示源表或 ClickHouse 中目标表的样本数据 以及用于删除 ClickPipe 和显示摄取作业摘要的控件。
  1. 恭喜! 您已成功设置第一个 Pub/Sub ClickPipe。它将持续运行,实时将来自您的 Pub/Sub topic 的数据摄取到您的 ClickHouse Cloud 服务中。

托管订阅

Pub/Sub 消息通过订阅来消费,而不是直接从 topic 消费。ClickPipes 会为每个管道创建并管理一个专用订阅——你只需选择 topic。
  • 托管订阅名为 clickpipes-{pipeID},会在管道启动时于该 topic 上创建。
  • 其配置包括 60 秒 ack deadline、7 天消息保留期,以及启用消息排序。
  • 订阅创建是幂等的——如果已有订阅指向所配置的 topic,则管道重启和副本重新调度时都会复用该订阅。
  • 在 topic 发现和消息采样期间,ClickPipes 还会创建短暂的临时订阅 (clickpipes-discovery-{uuid}) ,并在采样完成后立即删除。
  • 删除管道时,ClickPipes 也会在拆除过程中删除该托管订阅。
因此,你提供的服务账号除了需要具备消费订阅的权限外,还必须拥有在该项目中创建和删除订阅的权限。完整列表请参见 Pub/Sub IAM 权限指南

支持的数据格式

支持的格式如下:
  • JSON
  • Avro — 通过 Pub/Sub 原生 schema (BINARY 编码)
  • Protobuf — 通过 Pub/Sub 原生 schema (BINARY 编码)
对于 Avro 和 Protobuf,schema 会从 topic 对应的 Pub/Sub Schema Registry 中解析出来。管道始终使用该 topic schema 的最新修订版;UI 中的 schema 选择器在设计上为只读。

压缩

用于 Pub/Sub 的 ClickPipes 会自动检测并解压压缩消息。Pub/Sub 客户端传递的是原始字节数据——ClickPipes 会为你处理解压,无需任何配置。 支持以下压缩编解码器:
  • gzip
  • zstd
  • lz4
  • snappy (帧格式)
系统会根据每条消息中的 magic bytes 自动检测压缩类型。如果未发现已知的压缩特征,消息将被视为未压缩。检测到的压缩类型也会在 schema 推断期间显示,因此 UI 中的样本数据预览会正确显示解压后的载荷。
对于 JSON 这类基于文本的格式,自动检测是安全的,因为可打印的 ASCII 字符不会与压缩 magic bytes 冲突。解压后的载荷上限为 64MB。

受支持的数据类型

标准类型支持

ClickPipes 当前支持以下 ClickHouse 数据类型:
  • 基础数值类型 - [U]Int8/16/32/64、Float32/64 和 BFloat16
  • 大整数类型 - [U]Int128/256
  • Decimal 类型
  • 布尔类型
  • String
  • FixedString
  • Date、Date32
  • DateTime、DateTime64 (仅支持 UTC 时区)
  • Enum8/Enum16
  • UUID
  • IPv4
  • IPv6
  • 所有 ClickHouse LowCardinality 类型
  • Map,其键和值可使用上述任意类型 (包括 Nullable)
  • Tuple 和 Array,其元素可使用上述任意类型 (包括 Nullable,仅支持一层深度)
  • SimpleAggregateFunction 类型 (适用于 AggregatingMergeTree 或 SummingMergeTree 目标端)

Variant 类型支持

你可以为源数据 stream 中的任何 JSON 字段手动指定 Variant 类型 (例如 Variant(String, Int64, DateTime)) 。 由于 ClickPipes 判定应使用哪种 Variant 子类型的方式,在 Variant 定义中只能使用一种整数类型或 datetime 类型——例如,不支持 Variant(Int64, UInt32)

JSON 类型支持

始终为 JSON 对象的 JSON 字段可以分配到 JSON 目标列。您需要手动将目标 列更改为所需的 JSON 类型,包括任何固定路径或跳过路径。

Pub/Sub 虚拟列

Pub/Sub 主题支持以下虚拟列。创建新的目标表时,可通过 Add Column 按钮添加虚拟列。
NameDescriptionRecommended Data Type
_message_id由代理分配的 Pub/Sub 消息 IDString
_publish_timePub/Sub 发布时间戳 (毫秒精度,UTC)DateTime64(3)
_ordering_keyPub/Sub 排序键 (消息未设置 key 时为空字符串)String
_attributes用户自定义的 Pub/Sub 消息属性Map(String, String)
_raw_message完整的 Pub/Sub 消息载荷 (默认禁用)String
在只需要完整 Pub/Sub 消息载荷的场景中,可以使用 _raw_message 字段 (例如使用 ClickHouse JsonExtract* 函数来填充下游 materialized view) 。对于这类管道,删除所有“非虚拟”列可能有助于提升 ClickPipes 性能。

限制

  • 不支持 DEFAULT
  • 使用最小 (XS) 副本规格运行时,单条消息默认最大为 8MB (未压缩) ;使用更大规格的副本时,默认最大为 16MB (未压缩) 。超出此限制的消息将因报错而被拒绝。如果您需要支持更大的消息,请联系支持团队。
  • Pub/Sub 订阅过滤器不可变——更改过滤表达式需要重新创建管道。
  • 过滤器仅适用于消息属性,不适用于消息载荷。

性能

批处理

ClickPipes 会按批次将数据插入 ClickHouse。这样可以避免在数据库中创建过多的 parts,否则可能会导致集群性能问题。 满足以下任一条件时,批次就会被插入:
  • 批次大小达到上限 (每 1GB 副本内存为 100,000 行或 32MB)
  • 批次保持打开状态达到最长时限 (5 秒)

延迟

延迟 (定义为 Pub/Sub 消息发布到该消息在 ClickHouse 中可用之间的时间) 取决于多种因素 (发布端延迟、网络延迟、消息大小/格式) 。上一节中介绍的批处理也会影响延迟。我们始终建议针对具体使用场景进行测试,以了解你可以预期的延迟。 如果你有特定的低延迟要求,请联系我们

排序键

Pub/Sub 保证,具有相同排序键的消息会按发布顺序投递给单个订阅者。ClickPipes 默认在其托管订阅上启用消息排序——当消息带有排序键时,订阅者会按顺序接收;当消息不带排序键时,行为不变。 如果生产者将所有消息都发布到少量排序键 (甚至单个键) 下,Pub/Sub 会将这些消息集中分发给少量订阅者,这可能会限制水平吞吐量。我们建议在不需要保证顺序时不要使用排序键,或者使用高基数的排序键。

扩缩容

面向 Pub/Sub 的 ClickPipes 在设计上支持水平和垂直扩缩容。每个管道都使用一个托管的 Pub/Sub 订阅,此项不可配置。默认情况下,会由一个消费者从该订阅中拉取数据;你可以在创建 ClickPipe 时增加消费者数量,也可以之后随时在 设置 -> 高级设置 -> 扩缩容 中进行调整。ClickPipes 会自动将订阅中的消息分配给正在运行的消费者,无需额外协调。 ClickPipes 采用跨可用区分布的架构来提供高可用性;为此,至少需要扩缩到两个消费者。 无论运行中的消费者数量是多少,系统在设计上都具备容错能力。如果某个消费者或其底层基础设施发生故障,ClickPipes 会自动重启该消费者并继续处理消息。

投递语义

ClickPipes for Pub/Sub 提供至少一次投递保障。只有在对应的行已插入 ClickHouse (或格式错误的记录已写入错误表) 之后,Pub/Sub 消息才会被确认;所有消息在处理后都会被确认——包括被路由到错误表的错误记录——以防止无限次重复投递。如果某个副本在插入完成后、但在 ack 到达 Pub/Sub 之前崩溃,该消息会在 ack deadline 到期后再次投递并被再次插入,因此下游消费者必须能够容忍重复数据。如果你需要精确一次语义,请在下游使用 _message_id 虚拟列进行去重 (每个 Pub/Sub 消息 ID 在一个 topic 内都是唯一的) 。

身份验证

ClickPipes for Pub/Sub 使用服务账号 JSON 密钥向 GCP 进行身份验证。创建管道时,您需要上传该密钥文件;ClickPipes 会对其进行静态加密,并在运行时使用它来消费消息以及管理托管订阅的生命周期。 有关所需 IAM 权限的完整列表以及推荐的自定义角色定义,请参阅 Pub/Sub IAM 权限指南
最后修改于 2026年6月10日