NATS позволяет:
- Публиковать сообщения в subject и подписываться на них.
- Обрабатывать новые сообщения по мере их поступления.
Создание таблицы
nats_url– host:port (например,localhost:5672)..nats_subjects– Списокsubjectдля таблицы NATS, на которые нужно подписываться или в которые нужно публиковать сообщения. Поддерживаютсяsubjectс подстановочными символами, напримерfoo.*.barилиbaz.>nats_format– Формат сообщений. Используется та же нотация, что и в SQL-функцииFORMAT, напримерJSONEachRow. Подробнее см. в разделе Форматы.
nats_schema– Параметр, который необходимо использовать, если формат требует определения схемы. Например, Cap’n Proto требует путь к файлу схемы и имя корневого объектаschema.capnp:Message.nats_stream– Имя существующего stream в NATS JetStream.nats_consumer– Имя существующего durable pull consumer в NATS JetStream.nats_num_consumers– Количество consumers на таблицу. По умолчанию:1. Укажите больше consumers, если пропускной способности одного consumer недостаточно; применяется только к NATS core.nats_queue_group– Имя queue group для подписчиков NATS. По умолчанию используется имя таблицы.nats_max_reconnect– Устарел и не имеет эффекта; переподключение выполняется постоянно с тайм-аутомnats_reconnect_wait.nats_reconnect_wait– Время ожидания в миллисекундах между попытками переподключения. По умолчанию:5000.nats_server_list- Список серверов для подключения. Можно указать для подключения к кластеру NATS.nats_skip_broken_messages- Допустимое число несовместимых со схемой сообщений для parser сообщений NATS на block. По умолчанию:0. Еслиnats_skip_broken_messages = N, то движок пропускает N сообщений NATS, которые не удаётся разобрать (одно сообщение соответствует одной строке данных).nats_max_block_size- Число строк, собираемых с помощью poll(s) для сброса данных из NATS. По умолчанию: max_insert_block_size.nats_flush_interval_ms- Тайм-аут для сброса данных, прочитанных из NATS. По умолчанию: stream_flush_interval_ms.nats_username- Имя пользователя NATS.nats_password- Пароль NATS.nats_token- Токен аутентификации NATS.nats_credential_file- Путь к файлу учётных данных NATS.nats_startup_connect_tries- Количество попыток подключения при запуске. По умолчанию:5.nats_max_rows_per_message— Максимальное количество строк, записываемых в одно сообщение NATS для построчных форматов. (по умолчанию:1).nats_handle_error_mode— Как обрабатывать ошибки движка NATS. Возможные значения: default (если не удаётся разобрать сообщение, будет сгенерировано исключение), stream (сообщение об исключении и исходное сообщение будут сохранены в виртуальных столбцах_errorи_raw_message).
nats_secure = 1.
Проверка сертификата управляется переменной окружения CLICKHOUSE_NATS_TLS_SECURE;
Если срок действия сертификата истёк, сертификат самоподписанный, отсутствует или иным образом недействителен, отключите проверку, установив CLICKHOUSE_NATS_TLS_SECURE=0.
Запись в таблицу NATS:
Если таблица читает только из одного subject, любая вставка будет публиковаться в тот же subject.
Однако если таблица читает из нескольких subject, нужно указать, в какой subject следует публиковать данные.
Поэтому при вставке в таблицу с несколькими subject необходимо задать stream_like_engine_insert_queue.
Вы можете выбрать один из subject, из которых читает таблица, и опубликовать туда свои данные. Например:
Описание
SELECT не особенно полезен для чтения сообщений (кроме отладки), поскольку каждое сообщение можно прочитать только один раз. Гораздо практичнее создавать потоки в реальном времени с помощью materialized views. Для этого:
- Используйте движок, чтобы создать consumer NATS, и рассматривайте его как поток данных.
- Создайте таблицу с нужной структурой.
- Создайте materialized view, которое преобразует данные из движка и помещает их в ранее созданную таблицу.
MATERIALIZED VIEW подключается к движку, оно начинает собирать данные в фоновом режиме. Это позволяет непрерывно получать сообщения из NATS и преобразовывать их в требуемый формат с помощью SELECT.
Одна таблица NATS может иметь сколько угодно materialized views; они не читают данные из таблицы напрямую, а получают новые записи (блоками), поэтому вы можете записывать данные в несколько таблиц с разным уровнем детализации (с группировкой — агрегацией — и без неё).
Пример:
ALTER, мы рекомендуем отключить materialized view, чтобы избежать расхождений между целевой таблицей и данными из представления.
Виртуальные столбцы
_subject- subject сообщения NATS. Тип данных:String.
nats_handle_error_mode='stream':
_raw_message- Исходное сообщение, которое не удалось успешно разобрать. Тип данных:Nullable(String)._error- Сообщение об исключении, возникшем при сбое разбора. Тип данных:Nullable(String).
_raw_message и _error заполняются только в случае исключения при разборе; если сообщение успешно разобрано, они всегда имеют значение NULL.
Поддержка форматов данных
- Для построчных форматов количеством строк в одном сообщении NATS можно управлять с помощью настройки
nats_max_rows_per_message. - Для блочных форматов блок нельзя разделить на более мелкие части, однако количество строк в одном блоке можно регулировать с помощью общей настройки max_block_size.
Использование JetStream
nats из пакета NATS CLI:
создание stream
создание stream
создание durable pull consumer
создание durable pull consumer