Перейти к основному содержанию
Этот движок позволяет интегрировать ClickHouse с NATS. NATS позволяет:
  • Публиковать сообщения в subject и подписываться на них.
  • Обрабатывать новые сообщения по мере их поступления.

Создание таблицы

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = NATS SETTINGS
    nats_url = 'host:port',
    nats_subjects = 'subject1,subject2,...',
    nats_format = 'data_format'[,]
    [nats_schema = '',]
    [nats_num_consumers = N,]
    [nats_queue_group = 'group_name',]
    [nats_secure = false,]
    [nats_max_reconnect = N,]
    [nats_reconnect_wait = N,]
    [nats_server_list = 'host1:port1,host2:port2,...',]
    [nats_skip_broken_messages = N,]
    [nats_max_block_size = N,]
    [nats_flush_interval_ms = N,]
    [nats_username = 'user',]
    [nats_password = 'password',]
    [nats_token = 'clickhouse',]
    [nats_credential_file = '/var/nats_credentials',]
    [nats_startup_connect_tries = '5']
    [nats_max_rows_per_message = 1,]
    [nats_handle_error_mode = 'default']
Обязательные параметры:
  • nats_url – host:port (например, localhost:5672)..
  • nats_subjects – Список subject для таблицы NATS, на которые нужно подписываться или в которые нужно публиковать сообщения. Поддерживаются subject с подстановочными символами, например foo.*.bar или baz.>
  • nats_format – Формат сообщений. Используется та же нотация, что и в SQL-функции FORMAT, например JSONEachRow. Подробнее см. в разделе Форматы.
Необязательные параметры:
  • nats_schema – Параметр, который необходимо использовать, если формат требует определения схемы. Например, Cap’n Proto требует путь к файлу схемы и имя корневого объекта schema.capnp:Message.
  • nats_stream – Имя существующего stream в NATS JetStream.
  • nats_consumer – Имя существующего durable pull consumer в NATS JetStream.
  • nats_num_consumers – Количество consumers на таблицу. По умолчанию: 1. Укажите больше consumers, если пропускной способности одного consumer недостаточно; применяется только к NATS core.
  • nats_queue_group – Имя queue group для подписчиков NATS. По умолчанию используется имя таблицы.
  • nats_max_reconnect – Устарел и не имеет эффекта; переподключение выполняется постоянно с тайм-аутом nats_reconnect_wait.
  • nats_reconnect_wait – Время ожидания в миллисекундах между попытками переподключения. По умолчанию: 5000.
  • nats_server_list - Список серверов для подключения. Можно указать для подключения к кластеру NATS.
  • nats_skip_broken_messages - Допустимое число несовместимых со схемой сообщений для parser сообщений NATS на block. По умолчанию: 0. Если nats_skip_broken_messages = N, то движок пропускает N сообщений NATS, которые не удаётся разобрать (одно сообщение соответствует одной строке данных).
  • nats_max_block_size - Число строк, собираемых с помощью poll(s) для сброса данных из NATS. По умолчанию: max_insert_block_size.
  • nats_flush_interval_ms - Тайм-аут для сброса данных, прочитанных из NATS. По умолчанию: stream_flush_interval_ms.
  • nats_username - Имя пользователя NATS.
  • nats_password - Пароль NATS.
  • nats_token - Токен аутентификации NATS.
  • nats_credential_file - Путь к файлу учётных данных NATS.
  • nats_startup_connect_tries - Количество попыток подключения при запуске. По умолчанию: 5.
  • nats_max_rows_per_message — Максимальное количество строк, записываемых в одно сообщение NATS для построчных форматов. (по умолчанию: 1).
  • nats_handle_error_mode — Как обрабатывать ошибки движка NATS. Возможные значения: default (если не удаётся разобрать сообщение, будет сгенерировано исключение), stream (сообщение об исключении и исходное сообщение будут сохранены в виртуальных столбцах _error и _raw_message).
SSL-подключение: Для безопасного подключения используйте nats_secure = 1. Проверка сертификата управляется переменной окружения CLICKHOUSE_NATS_TLS_SECURE; Если срок действия сертификата истёк, сертификат самоподписанный, отсутствует или иным образом недействителен, отключите проверку, установив CLICKHOUSE_NATS_TLS_SECURE=0. Запись в таблицу NATS: Если таблица читает только из одного subject, любая вставка будет публиковаться в тот же subject. Однако если таблица читает из нескольких subject, нужно указать, в какой subject следует публиковать данные. Поэтому при вставке в таблицу с несколькими subject необходимо задать stream_like_engine_insert_queue. Вы можете выбрать один из subject, из которых читает таблица, и опубликовать туда свои данные. Например:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1,subject2',
             nats_format = 'JSONEachRow';

  INSERT INTO queue
  SETTINGS stream_like_engine_insert_queue = 'subject2'
  VALUES (1, 1);
Также можно добавить настройки формата вместе с настройками NATS. Пример:
  CREATE TABLE queue (
    key UInt64,
    value UInt64,
    date DateTime
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';
Конфигурацию сервера NATS можно добавить в файл конфигурации ClickHouse. В частности, можно добавить пароль для движка NATS:
<nats>
    <user>click</user>
    <password>house</password>
    <token>clickhouse</token>
</nats>

Описание

SELECT не особенно полезен для чтения сообщений (кроме отладки), поскольку каждое сообщение можно прочитать только один раз. Гораздо практичнее создавать потоки в реальном времени с помощью materialized views. Для этого:
  1. Используйте движок, чтобы создать consumer NATS, и рассматривайте его как поток данных.
  2. Создайте таблицу с нужной структурой.
  3. Создайте materialized view, которое преобразует данные из движка и помещает их в ранее созданную таблицу.
Когда MATERIALIZED VIEW подключается к движку, оно начинает собирать данные в фоновом режиме. Это позволяет непрерывно получать сообщения из NATS и преобразовывать их в требуемый формат с помощью SELECT. Одна таблица NATS может иметь сколько угодно materialized views; они не читают данные из таблицы напрямую, а получают новые записи (блоками), поэтому вы можете записывать данные в несколько таблиц с разным уровнем детализации (с группировкой — агрегацией — и без неё). Пример:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree() ORDER BY key;

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;
Чтобы перестать получать данные из потоков или изменить логику преобразования, отсоедините materialized view:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
Если вы хотите изменить целевую таблицу с помощью ALTER, мы рекомендуем отключить materialized view, чтобы избежать расхождений между целевой таблицей и данными из представления.

Виртуальные столбцы

  • _subject - subject сообщения NATS. Тип данных: String.
Дополнительные виртуальные столбцы, если nats_handle_error_mode='stream':
  • _raw_message - Исходное сообщение, которое не удалось успешно разобрать. Тип данных: Nullable(String).
  • _error - Сообщение об исключении, возникшем при сбое разбора. Тип данных: Nullable(String).
Примечание: виртуальные столбцы _raw_message и _error заполняются только в случае исключения при разборе; если сообщение успешно разобрано, они всегда имеют значение NULL.

Поддержка форматов данных

Движок NATS поддерживает все форматы, доступные в ClickHouse. Количество строк в одном сообщении NATS зависит от того, является ли формат построчным или блочным:
  • Для построчных форматов количеством строк в одном сообщении NATS можно управлять с помощью настройки nats_max_rows_per_message.
  • Для блочных форматов блок нельзя разделить на более мелкие части, однако количество строк в одном блоке можно регулировать с помощью общей настройки max_block_size.

Использование JetStream

Перед использованием движка NATS с NATS JetStream необходимо создать stream в NATS и durable pull consumer. Для этого можно использовать, например, утилиту nats из пакета NATS CLI:
$ nats stream add
? Stream Name stream_name
? Subjects stream_subject
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream stream_name was created

Information for Stream stream_name created 2025-10-03 14:12:51

                Subjects: stream_subject
                Replicas: 1
                 Storage: File

Options:

               Retention: Limits
         Acknowledgments: true
          Discard Policy: Old
        Duplicate Window: 2m0s
              Direct Get: true
       Allows Msg Delete: true
            Allows Purge: true
  Allows Per-Message TTL: false
          Allows Rollups: false

Limits:

        Maximum Messages: unlimited
     Maximum Per Subject: unlimited
           Maximum Bytes: unlimited
             Maximum Age: unlimited
    Maximum Message Size: unlimited
       Maximum Consumers: unlimited

State:

                Messages: 0
                   Bytes: 0 B
          First Sequence: 0
           Last Sequence: 0
        Active Consumers: 0
$ nats consumer add
? Select a Stream stream_name
? Consumer name consumer_name
? Delivery target (empty for Pull Consumers) 
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgment policy explicit
? Replay policy instant
? Filter Stream by subjects (blank for all) 
? Maximum Allowed Deliveries -1
? Maximum Acknowledgments Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
Information for Consumer stream_name > consumer_name created 2025-10-03T14:13:51+03:00

Configuration:

                    Name: consumer_name
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512
После создания stream и durable pull consumer можно создать таблицу с движком NATS. Для этого необходимо инициализировать: nats_stream, nats_consumer_name и nats_subjects:
CREATE TABLE nats_jet_stream (
    key UInt64,
    value UInt64
  ) ENGINE NATS 
    SETTINGS  nats_url = 'localhost:4222',
              nats_stream = 'stream_name',
              nats_consumer_name = 'consumer_name',
              nats_subjects = 'stream_subject',
              nats_format = 'JSONEachRow';
Последнее изменение 10 июня 2026 г.