Вы можете записаться в лист ожидания закрытой предварительной версии здесь.
Предварительные требования
Создание первого ClickPipe
- Откройте SQL-консоль для вашего сервиса ClickHouse Cloud.
- Нажмите кнопку
Data Sourcesв левом меню, затем — “Set up a ClickPipe”
- Выберите GCP Pub/Sub в качестве источника данных.
- Заполните форму: укажите имя ClickPipe, ваш GCP Project ID и JSON‑файл сервисного аккаунта для сервисного аккаунта, которому выдан доступ к Pub/Sub. Project ID должен содержать 6–30 символов, может включать строчные буквы, цифры и дефисы, должен начинаться с буквы и не может заканчиваться дефисом.
-
Выберите Pub/Sub topic, из которого нужно получать данные. После успешной проверки учётных данных раскрывающийся список автоматически заполняется топиками из вашего проекта GCP (в алфавитном порядке).
- Формат данных. При выборе топика ClickPipes обращается к реестру схем Pub/Sub. Если к топику привязана нативная схема Avro или Protobuf, Data format и Schema определяются автоматически, а селекторы фиксируются на последней схеме топика. Для топиков без нативной схемы по умолчанию используется JSONEachRow.
- Начальное смещение. Выберите, с какого места начинать чтение. Доступны варианты Latest (только новые сообщения), Earliest (самые старые сохранённые сообщения) и Seek to Timestamp (с выбором даты и времени в UTC).
- Filter expression (необязательно). Фильтр subscription filter Pub/Sub по атрибутам сообщений — например,
attributes.type = "telemetry". Фильтры применяются только к атрибутам сообщений, а не к полезной нагрузке, и не могут быть изменены после создания пайпа (для изменения фильтра пайп нужно пересоздать). - В интерфейсе будет показан пример сообщения из выбранного топика, а также переключатель Flatten object, с помощью которого можно предварительно посмотреть, как вложенный JSON будет развёрнут на стороне назначения.
- На следующем шаге вы можете выбрать, хотите ли вы загружать данные в новую таблицу ClickHouse или использовать существующую. Следуйте инструкциям на экране, чтобы изменить имя таблицы, схему и настройки. Вверху страницы отображается предварительный просмотр изменений в образце таблицы в реальном времени.
- Кроме того, вы можете загрузить данные в существующую таблицу ClickHouse. В этом случае интерфейс позволит сопоставить поля источника с полями ClickHouse в выбранной целевой таблице.
- Наконец, вы можете настроить разрешения для внутреннего пользователя ClickPipes.
Full access: полный доступ к кластеру. Это может быть полезно, если вы используете materialized view или словарь с целевой таблицей.Only destination table: только разрешенияINSERTдля целевой таблицы.
- Нажмите “Complete Setup”, чтобы система зарегистрировала ваш ClickPipe, после чего он появится в сводной таблице.
- Поздравляем! Вы успешно настроили свой первый Pub/Sub ClickPipe. Он будет непрерывно работать, выполняя ингестию данных в реальном времени из вашего топика Pub/Sub в ваш сервис ClickHouse Cloud.
Управляемые подписки
- Управляемая подписка называется
clickpipes-{pipeID}и создается в топике при запуске пайпа. - Для нее задаются срок подтверждения 60 секунд, хранение сообщений в течение 7 дней и включается упорядочение сообщений.
- Создание подписки идемпотентно — при перезапуске пайпа и перепланировании реплики используется существующая подписка, если она уже привязана к настроенному топику.
- Во время обнаружения топиков и сэмплирования сообщений ClickPipes также создает кратковременные эфемерные подписки (
clickpipes-discovery-{uuid}), которые удаляются сразу после завершения сэмплирования. - При удалении пайпа ClickPipes удаляет управляемую подписку в рамках очистки ресурсов.
Поддерживаемые форматы данных
- JSON
- Avro — через встроенные схемы Pub/Sub (кодирование BINARY)
- Protobuf — через встроенные схемы Pub/Sub (кодирование BINARY)
Сжатие
- gzip
- zstd
- lz4
- snappy (framed format)
Автоопределение безопасно для текстовых форматов, таких как JSON, поскольку печатаемые символы ASCII не могут совпасть с сигнатурами magic bytes сжатия. Размер распакованной полезной нагрузки ограничен 64 МБ.
Поддерживаемые типы данных
Поддержка стандартных типов
- Базовые числовые типы — [U]Int8/16/32/64, Float32/64 и BFloat16
- Типы больших целых чисел — [U]Int128/256
- Десятичные типы
- Boolean
- String
- FixedString
- Date, Date32
- DateTime, DateTime64 (только часовые пояса UTC)
- Enum8/Enum16
- UUID
- IPv4
- IPv6
- все типы ClickHouse LowCardinality
- Map с ключами и значениями любого из указанных выше типов (включая Nullable)
- Tuple и Array с элементами любого из указанных выше типов (включая Nullable, только один уровень вложенности)
- типы SimpleAggregateFunction (для пунктов назначения AggregatingMergeTree или SummingMergeTree)
Поддержка типа Variant
Variant(String, Int64, DateTime)) для любого поля JSON
в потоке исходных данных. Поскольку ClickPipes определяет подходящий подтип варианта особым образом, в определении Variant можно использовать только один целочисленный тип или тип DateTime —
например, Variant(Int64, UInt32) не поддерживается.
Поддержка типа JSON
Виртуальные столбцы Pub/Sub
Add Column.
| Имя | Описание | Рекомендуемый тип данных |
|---|---|---|
| _message_id | Идентификатор сообщения Pub/Sub, назначенный брокером | String |
| _publish_time | Временная метка публикации Pub/Sub (с точностью до миллисекунд, UTC) | DateTime64(3) |
| _ordering_key | Ключ упорядочивания Pub/Sub (пустая строка, если для сообщения ключ не задан) | String |
| _attributes | Пользовательские атрибуты сообщения Pub/Sub | Map(String, String) |
| _raw_message | Полная полезная нагрузка сообщения Pub/Sub (по умолчанию отключена) | String |
_raw_message можно использовать в случаях, когда нужна только полная полезная нагрузка сообщения Pub/Sub (например, при использовании функций ClickHouse JsonExtract* для заполнения последующего materialized view). Для таких пайпов производительность ClickPipes может повыситься, если удалить все «не виртуальные» столбцы.
Ограничения
- DEFAULT не поддерживается.
- По умолчанию размер отдельных сообщений ограничен 8 МБ (в несжатом виде) при использовании реплики минимального размера (XS) и 16 МБ (в несжатом виде) для более крупных реплик. Сообщения, превышающие этот лимит, будут отклоняться с ошибкой. Если вам нужны сообщения большего размера, обратитесь в службу поддержки.
- Фильтры подписки Pub/Sub неизменяемы — для изменения filter expression необходимо пересоздать пайп.
- Фильтры применяются только к атрибутам сообщения, а не к полезной нагрузке.
Производительность
Формирование батчей
- Размер батча достиг максимального значения (100,000 строк или 32MB на 1GB памяти реплики)
- Батч оставался открытым максимально допустимое время (5 секунд)
Задержка
Ключи упорядочивания
Масштабирование
Семантика доставки
_message_id (каждый идентификатор сообщения Pub/Sub уникален в пределах topic).