Перейти к основному содержанию
Этот движок позволяет интегрировать ClickHouse с RabbitMQ. RabbitMQ позволяет:
  • Публиковать потоки данных и подписываться на них.
  • Обрабатывать потоки по мере их поступления.

Создание таблицы

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1],
    name2 [type2],
    ...
) ENGINE = RabbitMQ SETTINGS
    rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'],
    rabbitmq_exchange_name = 'exchange_name',
    rabbitmq_format = 'data_format'[,]
    [rabbitmq_exchange_type = 'exchange_type',]
    [rabbitmq_routing_key_list = 'key1,key2,...',]
    [rabbitmq_secure = 0,]
    [rabbitmq_schema = '',]
    [rabbitmq_num_consumers = N,]
    [rabbitmq_num_queues = N,]
    [rabbitmq_queue_base = 'queue',]
    [rabbitmq_deadletter_exchange = 'dl-exchange',]
    [rabbitmq_persistent = 0,]
    [rabbitmq_skip_broken_messages = N,]
    [rabbitmq_max_block_size = N,]
    [rabbitmq_flush_interval_ms = N,]
    [rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish',]
    [rabbitmq_queue_consume = false,]
    [rabbitmq_address = '',]
    [rabbitmq_vhost = '/',]
    [rabbitmq_username = '',]
    [rabbitmq_password = '',]
    [rabbitmq_commit_on_select = false,]
    [rabbitmq_max_rows_per_message = 1,]
    [rabbitmq_handle_error_mode = 'default']
Обязательные параметры:
  • rabbitmq_host_port – host:port (например, localhost:5672).
  • rabbitmq_exchange_name – имя exchange в RabbitMQ.
  • rabbitmq_format – формат сообщения. Используется та же нотация, что и для SQL-функции FORMAT, например JSONEachRow. Дополнительные сведения см. в разделе Форматы.
Необязательные параметры:
  • rabbitmq_exchange_type – Тип RabbitMQ exchange: direct, fanout, topic, headers, consistent_hash. По умолчанию: fanout.
  • rabbitmq_routing_key_list – Список ключей маршрутизации, разделенных запятыми.
  • rabbitmq_schema – Параметр, который необходимо использовать, если формат требует определения схемы. Например, Cap’n Proto требует указать путь к файлу схемы и имя корневого объекта schema.capnp:Message.
  • rabbitmq_num_consumers – Количество consumers на таблицу. Укажите больше consumers, если пропускной способности одного consumer недостаточно. По умолчанию: 1
  • rabbitmq_num_queues – Общее количество очередей. Увеличение этого числа может значительно повысить производительность. По умолчанию: 1.
  • rabbitmq_queue_base - Укажите префикс для имен очередей. Варианты использования этой настройки описаны ниже.
  • rabbitmq_persistent - Если установлено значение 1 (true), для запроса вставки режим доставки будет установлен в 2 (сообщения помечаются как persistent). По умолчанию: 0.
  • rabbitmq_skip_broken_messages – Допустимое количество несовместимых со схемой RabbitMQ-сообщений на блок, которые может пропустить парсер. Если rabbitmq_skip_broken_messages = N, то движок пропускает N RabbitMQ-сообщений, которые не удается разобрать (одно сообщение соответствует одной строке данных). По умолчанию: 0.
  • rabbitmq_max_block_size - Количество строк, собираемых перед сбросом данных из RabbitMQ. По умолчанию: max_insert_block_size.
  • rabbitmq_flush_interval_ms - Тайм-аут для сброса данных из RabbitMQ. По умолчанию: stream_flush_interval_ms.
  • rabbitmq_queue_settings_list - позволяет задавать настройки RabbitMQ при создании очереди. Доступные настройки: x-max-length, x-max-length-bytes, x-message-ttl, x-expires, x-priority, x-max-priority, x-overflow, x-dead-letter-exchange, x-queue-type. Настройка durable автоматически включается для очереди.
  • rabbitmq_address - Адрес для подключения. Используйте либо эту настройку, либо rabbitmq_host_port.
  • rabbitmq_vhost - RabbitMQ vhost. По умолчанию: '\'.
  • rabbitmq_queue_consume - Использовать пользовательские очереди и не выполнять никакую настройку RabbitMQ: объявление exchanges, очередей и привязок. По умолчанию: false.
  • rabbitmq_username - Имя пользователя RabbitMQ.
  • rabbitmq_password - Пароль RabbitMQ.
  • reject_unhandled_messages - Отклонять сообщения (отправлять RabbitMQ отрицательное подтверждение) в случае ошибок. Эта настройка автоматически включается, если в rabbitmq_queue_settings_list определен x-dead-letter-exchange.
  • rabbitmq_commit_on_select - Подтверждать сообщения при выполнении запроса select. По умолчанию: false.
  • rabbitmq_max_rows_per_message — Максимальное количество строк, записываемых в одно RabbitMQ-сообщение для построчных форматов. По умолчанию: 1.
  • rabbitmq_empty_queue_backoff_start_ms — Начальная задержка перед повторной попыткой чтения, если очередь RabbitMQ пуста.
  • rabbitmq_empty_queue_backoff_end_ms — Максимальная задержка перед повторной попыткой чтения, если очередь RabbitMQ пуста.
  • rabbitmq_empty_queue_backoff_step_ms — Шаг увеличения задержки перед повторной попыткой чтения, если очередь RabbitMQ пуста.
  • rabbitmq_handle_error_mode — Как обрабатывать ошибки в движке RabbitMQ. Возможные значения: default (если сообщение не удается разобрать, будет сгенерировано исключение), stream (сообщение об исключении и исходное сообщение будут сохранены в виртуальных столбцах _error и _raw_message), dead_letter_queue (данные, связанные с ошибками, будут сохранены в system.dead_letter_queue).

SSL‑соединение

Используйте либо rabbitmq_secure = 1, либо amqps в адресе подключения: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'. По умолчанию используемая библиотека не проверяет, достаточно ли безопасно созданное TLS‑соединение. Просрочен ли сертификат, самоподписан ли он, отсутствует или недействителен — подключение всё равно разрешается. Более строгая проверка сертификатов, возможно, будет реализована в будущем. Также вместе с настройками RabbitMQ можно добавлять настройки формата. Пример:
  CREATE TABLE queue (
    key UInt64,
    value UInt64,
    date DateTime
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5,
                            date_time_input_format = 'best_effort';
Конфигурацию сервера RabbitMQ следует добавить в файл конфигурации ClickHouse. Требуемая конфигурация:
 <rabbitmq>
    <username>root</username>
    <password>clickhouse</password>
 </rabbitmq>
Дополнительная конфигурация:
 <rabbitmq>
    <vhost>clickhouse</vhost>
 </rabbitmq>

Описание

SELECT не очень полезен для чтения сообщений (кроме отладки), потому что каждое сообщение можно прочитать только один раз. Гораздо практичнее создавать потоки реального времени с помощью materialized views. Для этого:
  1. Используйте движок, чтобы создать consumer RabbitMQ и рассматривать его как поток данных.
  2. Создайте таблицу с нужной структурой.
  3. Создайте materialized view, которое преобразует данные из движка и помещает их в ранее созданную таблицу.
Когда MATERIALIZED VIEW подключается к движку, оно начинает собирать данные в фоновом режиме. Это позволяет непрерывно получать сообщения из RabbitMQ и преобразовывать их в нужный формат с помощью SELECT. Для одной таблицы RabbitMQ можно создать сколько угодно materialized views. Данные можно маршрутизировать на основе rabbitmq_exchange_type и указанного rabbitmq_routing_key_list. Для одной таблицы может быть не более одного exchange. Один exchange может использоваться несколькими таблицами — это позволяет одновременно маршрутизировать данные в несколько таблиц. Варианты типа exchange:
  • direct — маршрутизация основана на точном совпадении ключей. Пример списка ключей таблицы: key1,key2,key3,key4,key5; ключ сообщения может совпадать с любым из них.
  • fanout — маршрутизация во все таблицы (где имя exchange одинаковое) независимо от ключей.
  • topic — маршрутизация основана на шаблонах с ключами, разделенными точками. Примеры: *.logs, records.*.*.2020, *.2018,*.2019,*.2020.
  • headers — маршрутизация основана на совпадениях key=value с настройкой x-match=all или x-match=any. Пример списка ключей таблицы: x-match=all,format=logs,type=report,year=2020.
  • consistent_hash — данные равномерно распределяются между всеми привязанными таблицами (где имя exchange одинаковое). Обратите внимание, что этот тип exchange должен быть включен с помощью plugin RabbitMQ: rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
Настройка rabbitmq_queue_base может использоваться в следующих случаях:
  • чтобы разные таблицы могли совместно использовать очереди, и для одних и тех же очередей можно было зарегистрировать несколько consumers, что повышает производительность. При использовании настроек rabbitmq_num_consumers и/или rabbitmq_num_queues точное совпадение очередей достигается, если эти параметры одинаковы.
  • чтобы можно было восстановить чтение из определенных долговечных очередей, если не все сообщения были успешно обработаны. Чтобы возобновить consumption из одной конкретной очереди, задайте ее имя в настройке rabbitmq_queue_base и не указывайте rabbitmq_num_consumers и rabbitmq_num_queues (по умолчанию 1). Чтобы возобновить consumption из всех очередей, объявленных для конкретной таблицы, просто укажите те же настройки: rabbitmq_queue_base, rabbitmq_num_consumers, rabbitmq_num_queues. По умолчанию имена очередей будут уникальными для таблиц.
  • чтобы повторно использовать очереди, так как они объявлены как долговечные и не удаляются автоматически. (Их можно удалить с помощью любого из CLI-инструментов RabbitMQ.)
Чтобы повысить производительность, полученные сообщения группируются в блоки размером max_insert_block_size. Если блок не был сформирован в течение stream_flush_interval_ms миллисекунд, данные будут записаны в таблицу независимо от полноты блока. Если настройки rabbitmq_num_consumers и/или rabbitmq_num_queues указаны вместе с rabbitmq_exchange_type, тогда:
  • должен быть включен plugin rabbitmq-consistent-hash-exchange.
  • должно быть указано свойство message_id публикуемых сообщений (уникальное для каждого сообщения/батча).
Для insert query доступны метаданные сообщения, которые добавляются к каждому опубликованному сообщению: messageID и флаг republished (true, если сообщение публиковалось более одного раза) — доступ к ним можно получить через заголовки сообщения. Не используйте одну и ту же таблицу для вставок и materialized views. Пример:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_exchange_type = 'headers',
                            rabbitmq_routing_key_list = 'format=logs,type=report,year=2020',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5;

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree() ORDER BY key;

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;

Виртуальные столбцы

  • _exchange_name - Имя exchange RabbitMQ. Тип данных: String.
  • _channel_id - ChannelID, в котором был объявлен consumer, получивший сообщение. Тип данных: String.
  • _delivery_tag - DeliveryTag полученного сообщения. Уникален в пределах канала. Тип данных: UInt64.
  • _redelivered - Флаг redelivered сообщения. Тип данных: UInt8.
  • _message_id - messageID полученного сообщения; непустой, если был задан при публикации сообщения. Тип данных: String.
  • _timestamp - временная метка полученного сообщения; непустая, если была задана при публикации сообщения. Тип данных: UInt64.
Дополнительные виртуальные столбцы, когда rabbitmq_handle_error_mode='stream':
  • _raw_message - Необработанное сообщение, которое не удалось успешно разобрать. Тип данных: Nullable(String).
  • _error - Сообщение об ошибке, возникшей при неудачном разборе. Тип данных: Nullable(String).
Примечание: виртуальные столбцы _raw_message и _error заполняются только в случае исключения при разборе; если сообщение было успешно разобрано, они всегда имеют значение NULL.

Ограничения

Даже если в определении таблицы указать выражения по умолчанию для столбцов (такие как DEFAULT, MATERIALIZED, ALIAS), они будут проигнорированы. Вместо этого столбцы будут заполнены соответствующими значениями по умолчанию для своих типов.

Поддерживаемые форматы данных

Движок RabbitMQ поддерживает все форматы, доступные в ClickHouse. Количество строк в одном сообщении RabbitMQ зависит от того, является формат построчным или блочным:
  • Для построчных форматов количество строк в одном сообщении RabbitMQ можно задавать с помощью настройки rabbitmq_max_rows_per_message.
  • Для блочных форматов разбить блок на более мелкие части нельзя, но количество строк в одном блоке можно регулировать общей настройкой max_block_size.
Последнее изменение 10 июня 2026 г.