RabbitMQ позволяет:
- Публиковать потоки данных и подписываться на них.
- Обрабатывать потоки по мере их поступления.
Создание таблицы
rabbitmq_host_port– host:port (например,localhost:5672).rabbitmq_exchange_name– имя exchange в RabbitMQ.rabbitmq_format– формат сообщения. Используется та же нотация, что и для SQL-функцииFORMAT, напримерJSONEachRow. Дополнительные сведения см. в разделе Форматы.
rabbitmq_exchange_type– Тип RabbitMQ exchange:direct,fanout,topic,headers,consistent_hash. По умолчанию:fanout.rabbitmq_routing_key_list– Список ключей маршрутизации, разделенных запятыми.rabbitmq_schema– Параметр, который необходимо использовать, если формат требует определения схемы. Например, Cap’n Proto требует указать путь к файлу схемы и имя корневого объектаschema.capnp:Message.rabbitmq_num_consumers– Количество consumers на таблицу. Укажите больше consumers, если пропускной способности одного consumer недостаточно. По умолчанию:1rabbitmq_num_queues– Общее количество очередей. Увеличение этого числа может значительно повысить производительность. По умолчанию:1.rabbitmq_queue_base- Укажите префикс для имен очередей. Варианты использования этой настройки описаны ниже.rabbitmq_persistent- Если установлено значение 1 (true), для запроса вставки режим доставки будет установлен в 2 (сообщения помечаются какpersistent). По умолчанию:0.rabbitmq_skip_broken_messages– Допустимое количество несовместимых со схемой RabbitMQ-сообщений на блок, которые может пропустить парсер. Еслиrabbitmq_skip_broken_messages = N, то движок пропускает N RabbitMQ-сообщений, которые не удается разобрать (одно сообщение соответствует одной строке данных). По умолчанию:0.rabbitmq_max_block_size- Количество строк, собираемых перед сбросом данных из RabbitMQ. По умолчанию: max_insert_block_size.rabbitmq_flush_interval_ms- Тайм-аут для сброса данных из RabbitMQ. По умолчанию: stream_flush_interval_ms.rabbitmq_queue_settings_list- позволяет задавать настройки RabbitMQ при создании очереди. Доступные настройки:x-max-length,x-max-length-bytes,x-message-ttl,x-expires,x-priority,x-max-priority,x-overflow,x-dead-letter-exchange,x-queue-type. Настройкаdurableавтоматически включается для очереди.rabbitmq_address- Адрес для подключения. Используйте либо эту настройку, либоrabbitmq_host_port.rabbitmq_vhost- RabbitMQ vhost. По умолчанию:'\'.rabbitmq_queue_consume- Использовать пользовательские очереди и не выполнять никакую настройку RabbitMQ: объявление exchanges, очередей и привязок. По умолчанию:false.rabbitmq_username- Имя пользователя RabbitMQ.rabbitmq_password- Пароль RabbitMQ.reject_unhandled_messages- Отклонять сообщения (отправлять RabbitMQ отрицательное подтверждение) в случае ошибок. Эта настройка автоматически включается, если вrabbitmq_queue_settings_listопределенx-dead-letter-exchange.rabbitmq_commit_on_select- Подтверждать сообщения при выполнении запроса select. По умолчанию:false.rabbitmq_max_rows_per_message— Максимальное количество строк, записываемых в одно RabbitMQ-сообщение для построчных форматов. По умолчанию:1.rabbitmq_empty_queue_backoff_start_ms— Начальная задержка перед повторной попыткой чтения, если очередь RabbitMQ пуста.rabbitmq_empty_queue_backoff_end_ms— Максимальная задержка перед повторной попыткой чтения, если очередь RabbitMQ пуста.rabbitmq_empty_queue_backoff_step_ms— Шаг увеличения задержки перед повторной попыткой чтения, если очередь RabbitMQ пуста.rabbitmq_handle_error_mode— Как обрабатывать ошибки в движке RabbitMQ. Возможные значения:default(если сообщение не удается разобрать, будет сгенерировано исключение),stream(сообщение об исключении и исходное сообщение будут сохранены в виртуальных столбцах_errorи_raw_message),dead_letter_queue(данные, связанные с ошибками, будут сохранены вsystem.dead_letter_queue).
SSL‑соединение
rabbitmq_secure = 1, либо amqps в адресе подключения: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'.
По умолчанию используемая библиотека не проверяет, достаточно ли безопасно созданное TLS‑соединение. Просрочен ли сертификат, самоподписан ли он, отсутствует или недействителен — подключение всё равно разрешается. Более строгая проверка сертификатов, возможно, будет реализована в будущем.
Также вместе с настройками RabbitMQ можно добавлять настройки формата.
Пример:
Описание
SELECT не очень полезен для чтения сообщений (кроме отладки), потому что каждое сообщение можно прочитать только один раз. Гораздо практичнее создавать потоки реального времени с помощью materialized views. Для этого:
- Используйте движок, чтобы создать consumer RabbitMQ и рассматривать его как поток данных.
- Создайте таблицу с нужной структурой.
- Создайте materialized view, которое преобразует данные из движка и помещает их в ранее созданную таблицу.
MATERIALIZED VIEW подключается к движку, оно начинает собирать данные в фоновом режиме. Это позволяет непрерывно получать сообщения из RabbitMQ и преобразовывать их в нужный формат с помощью SELECT.
Для одной таблицы RabbitMQ можно создать сколько угодно materialized views.
Данные можно маршрутизировать на основе rabbitmq_exchange_type и указанного rabbitmq_routing_key_list.
Для одной таблицы может быть не более одного exchange. Один exchange может использоваться несколькими таблицами — это позволяет одновременно маршрутизировать данные в несколько таблиц.
Варианты типа exchange:
direct— маршрутизация основана на точном совпадении ключей. Пример списка ключей таблицы:key1,key2,key3,key4,key5; ключ сообщения может совпадать с любым из них.fanout— маршрутизация во все таблицы (где имя exchange одинаковое) независимо от ключей.topic— маршрутизация основана на шаблонах с ключами, разделенными точками. Примеры:*.logs,records.*.*.2020,*.2018,*.2019,*.2020.headers— маршрутизация основана на совпаденияхkey=valueс настройкойx-match=allилиx-match=any. Пример списка ключей таблицы:x-match=all,format=logs,type=report,year=2020.consistent_hash— данные равномерно распределяются между всеми привязанными таблицами (где имя exchange одинаковое). Обратите внимание, что этот тип exchange должен быть включен с помощью plugin RabbitMQ:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
rabbitmq_queue_base может использоваться в следующих случаях:
- чтобы разные таблицы могли совместно использовать очереди, и для одних и тех же очередей можно было зарегистрировать несколько consumers, что повышает производительность. При использовании настроек
rabbitmq_num_consumersи/илиrabbitmq_num_queuesточное совпадение очередей достигается, если эти параметры одинаковы. - чтобы можно было восстановить чтение из определенных долговечных очередей, если не все сообщения были успешно обработаны. Чтобы возобновить consumption из одной конкретной очереди, задайте ее имя в настройке
rabbitmq_queue_baseи не указывайтеrabbitmq_num_consumersиrabbitmq_num_queues(по умолчанию 1). Чтобы возобновить consumption из всех очередей, объявленных для конкретной таблицы, просто укажите те же настройки:rabbitmq_queue_base,rabbitmq_num_consumers,rabbitmq_num_queues. По умолчанию имена очередей будут уникальными для таблиц. - чтобы повторно использовать очереди, так как они объявлены как долговечные и не удаляются автоматически. (Их можно удалить с помощью любого из CLI-инструментов RabbitMQ.)
rabbitmq_num_consumers и/или rabbitmq_num_queues указаны вместе с rabbitmq_exchange_type, тогда:
- должен быть включен plugin
rabbitmq-consistent-hash-exchange. - должно быть указано свойство
message_idпубликуемых сообщений (уникальное для каждого сообщения/батча).
messageID и флаг republished (true, если сообщение публиковалось более одного раза) — доступ к ним можно получить через заголовки сообщения.
Не используйте одну и ту же таблицу для вставок и materialized views.
Пример:
Виртуальные столбцы
_exchange_name- Имя exchange RabbitMQ. Тип данных:String._channel_id- ChannelID, в котором был объявлен consumer, получивший сообщение. Тип данных:String._delivery_tag- DeliveryTag полученного сообщения. Уникален в пределах канала. Тип данных:UInt64._redelivered- Флагredeliveredсообщения. Тип данных:UInt8._message_id- messageID полученного сообщения; непустой, если был задан при публикации сообщения. Тип данных:String._timestamp- временная метка полученного сообщения; непустая, если была задана при публикации сообщения. Тип данных:UInt64.
rabbitmq_handle_error_mode='stream':
_raw_message- Необработанное сообщение, которое не удалось успешно разобрать. Тип данных:Nullable(String)._error- Сообщение об ошибке, возникшей при неудачном разборе. Тип данных:Nullable(String).
_raw_message и _error заполняются только в случае исключения при разборе; если сообщение было успешно разобрано, они всегда имеют значение NULL.
Ограничения
DEFAULT, MATERIALIZED, ALIAS), они будут проигнорированы. Вместо этого столбцы будут заполнены соответствующими значениями по умолчанию для своих типов.
Поддерживаемые форматы данных
- Для построчных форматов количество строк в одном сообщении RabbitMQ можно задавать с помощью настройки
rabbitmq_max_rows_per_message. - Для блочных форматов разбить блок на более мелкие части нельзя, но количество строк в одном блоке можно регулировать общей настройкой max_block_size.