跳转到主要内容
默认情况下,GCS ClickPipe 假定文件会按词典序添加到存储桶中。通过设置一个连接到该存储桶的 Google Cloud Pub/Sub 订阅,也可以将 GCS ClickPipe 配置为摄取不具有隐含顺序的文件。这样一来,ClickPipes 就可以监听 OBJECT_FINALIZE 通知,并摄取任何新文件,而不受文件命名约定的影响。
无序模式支持公共存储桶。它要求使用服务账号身份验证,并配置一个连接到该存储桶的 Google Cloud Pub/Sub 订阅。

工作原理

在此模式下,GCS ClickPipe 会先对所选路径中的所有文件执行初始加载,然后通过 Pub/Sub 订阅监听与指定路径匹配的对象通知。对于先前已处理过的文件、不匹配该路径的文件,或其他类型事件的任何消息,都会被忽略无法从某个特定文件或时间点开始摄取——ClickPipes 始终会加载所选路径中的所有文件。 摄取数据时可能会发生多种故障,从而导致部分插入或数据重复。对象存储 ClickPipes 能够处理插入失败,并通过临时暂存表提供精确一次语义。数据会先插入暂存表;如果出现问题,暂存表会被清空,并在干净状态下重新尝试插入。只有当插入成功完成后,分区才会被移动到目标表。
1

创建 Google Cloud Pub/Sub 主题

1. 在 Google Cloud Console 中,导航到 Pub/Sub > Topics > Create topic。创建一个带默认订阅的新主题,并记下其 Topic Name2. 配置一个 GCS 存储桶通知,将 OBJECT_FINALIZE 事件发布到上面创建的 Pub/Sub 主题。2.1. 此步骤无法在 Google Cloud Console 中完成,因此你必须使用 gcloud 客户端或你偏好的 Google Cloud 编程接口。例如,使用 gcloud
# 为存储桶中的新对象创建一个 Pub/Sub 通知
gcloud storage buckets notifications create "gs://${YOUR_BUCKET_NAME}" \
  --topic="projects/${YOUR_PROJECT_ID}/topics/${YOUR_TOPIC_NAME}" \
  --event-types="OBJECT_FINALIZE" \
  --payload-format="json"

# 列出存储桶中的 Pub/Sub 通知
gcloud storage buckets notifications describe
2

配置服务账号

1. 配置一个具有所需权限服务账号,以允许 ClickPipes 列出并拉取指定存储桶中的对象,以及消费和监控来自 Pub/Sub 订阅的通知。1.1. 此步骤可以在 Google Cloud Console 中完成,也可以使用 gcloud 客户端或你偏好的 Google Cloud 编程接口完成。例如,使用 gcloud
# 1. 授予对 GCS 存储桶的读取权限
gcloud storage buckets add-iam-policy-binding "gs://${YOUR_BUCKET_NAME}" \
  --member="serviceAccount:${YOUR_SERVICE_ACCOUNT}@${YOUR_PROJECT_ID}.iam.gserviceaccount.com" \
  --role="roles/storage.objectViewer"

# 2. 授予对 Pub/Sub 订阅的读取权限
gcloud pubsub subscriptions add-iam-policy-binding "${YOUR_SUBSCRIPTION_NAME}" \
  --member="serviceAccount:${YOUR_SERVICE_ACCOUNT}@${YOUR_PROJECT_ID}.iam.gserviceaccount.com" \
  --role="roles/pubsub.subscriber"

# 3. 授予获取 Pub/Sub 订阅元数据的权限
gcloud pubsub subscriptions add-iam-policy-binding "${YOUR_SUBSCRIPTION_NAME}" \
  --member="serviceAccount:${YOUR_SERVICE_ACCOUNT}@${YOUR_PROJECT_ID}.iam.gserviceaccount.com" \
  --role="roles/pubsub.viewer"
3

以无序模式创建 ClickPipe

1. 在 ClickHouse Cloud 控制台中,导航到 Data Sources > Create ClickPipe,然后选择 Google Cloud Storage。输入连接到你的 GCS 存储桶所需的详细信息。在 Authentication method 下,选择 服务账号 并提供 .json 服务账号密钥。2. 打开 Continuous ingestion,然后选择 Any order 作为摄取模式,并提供与你的存储桶关联的 Pub/Sub subscription 名称。订阅名称必须符合以下格式:
projects/${YOUR_PROJECT_ID}/subscriptions/${YOUR_SUBSCRIPTION_NAME}
3. 点击 Incoming data。为目标表定义一个 Sorting key。对映射后的 schema 进行必要调整,然后为 ClickPipes 数据库用户配置角色。4. 检查配置并点击 Create ClickPipe。ClickPipes 将先扫描你的存储桶,加载所有与指定路径匹配的现有文件,然后在主题中有新的 OBJECT_FINALIZE 事件到达时开始处理文件。
最后修改于 2026年6月10日