RabbitMQ 支持您:
- 发布或订阅数据流。
- 在流可用后立即进行处理。
创建表
rabbitmq_host_port– host:port (例如localhost:5672) 。rabbitmq_exchange_name– RabbitMQ 的 exchange 名称。rabbitmq_format– 消息格式。使用与 SQLFORMAT函数相同的记法,例如JSONEachRow。更多信息,请参见 Formats 章节。
rabbitmq_exchange_type– RabbitMQ exchange 的类型:direct、fanout、topic、headers、consistent_hash。默认值:fanout。rabbitmq_routing_key_list– 以逗号分隔的路由键列表。rabbitmq_schema– 如果 format 需要 schema 定义,则必须使用此参数。例如,Cap’n Proto 需要提供 schema 文件的路径以及根对象schema.capnp:Message的名称。rabbitmq_num_consumers– 每个表的消费者数量。如果单个消费者的吞吐量不足,请指定更多消费者。默认值:1rabbitmq_num_queues– 队列总数。增加此数量可显著提升性能。默认值:1。rabbitmq_queue_base- 为队列名称指定提示信息。此设置的使用场景见下文。rabbitmq_persistent- 如果设置为 1 (true),则插入查询的投递模式将设置为 2 (将消息标记为“持久”) 。默认值:0。rabbitmq_skip_broken_messages– RabbitMQ 消息解析器对每个块中与 schema 不兼容消息的容忍数量。如果rabbitmq_skip_broken_messages = N,则引擎会跳过 N 条无法解析的 RabbitMQ 消息 (1 条消息等于 1 行数据) 。默认值:0。rabbitmq_max_block_size- 从 RabbitMQ 刷新数据前收集的行数。默认值:max_insert_block_size。rabbitmq_flush_interval_ms- 从 RabbitMQ 刷新数据的超时时间。默认值:stream_flush_interval_ms。rabbitmq_queue_settings_list- 允许在创建队列时设置 RabbitMQ 参数。可用设置:x-max-length、x-max-length-bytes、x-message-ttl、x-expires、x-priority、x-max-priority、x-overflow、x-dead-letter-exchange、x-queue-type。队列的durable设置会自动启用。rabbitmq_address- 连接地址。使用此设置或rabbitmq_host_port。rabbitmq_vhost- RabbitMQ vhost。默认值:'\'。rabbitmq_queue_consume- 使用用户定义的队列,且不执行任何 RabbitMQ 设置:声明 exchanges、queues、bindings。默认值:false。rabbitmq_username- RabbitMQ 用户名。rabbitmq_password- RabbitMQ 密码。reject_unhandled_messages- 发生错误时拒绝消息 (向 RabbitMQ 发送负确认) 。如果在rabbitmq_queue_settings_list中定义了x-dead-letter-exchange,则会自动启用此设置。rabbitmq_commit_on_select- 在执行 select 查询时提交消息。默认值:false。rabbitmq_max_rows_per_message— 对于基于行的 format,单条 RabbitMQ 消息中写入的最大行数。默认值:1。rabbitmq_empty_queue_backoff_start_ms— 当 RabbitMQ 队列为空时,重新调度读取的 backoff 起始点。rabbitmq_empty_queue_backoff_end_ms— 当 RabbitMQ 队列为空时,重新调度读取的 backoff 结束点。rabbitmq_empty_queue_backoff_step_ms— 当 RabbitMQ 队列为空时,重新调度读取的 backoff 步长。rabbitmq_handle_error_mode— RabbitMQ engine 的错误处理方式。Possible values: default (如果消息解析失败则抛出 exception) 、stream (exception 消息和原始消息将保存在虚拟列_error和_raw_message中) 、dead_letter_queue (与错误相关的数据将保存在 system.dead_letter_queue 中) 。
SSL 连接
rabbitmq_secure = 1 或 amqps:rabbitmq_address = 'amqps://guest:guest@localhost/vhost'。
所使用库的默认行为是,不会检查所建立的 TLS 连接是否足够安全。无论证书已过期、为自签名、缺失还是无效,都会允许建立连接。未来可能会实现更严格的证书检查。
也可以在添加 RabbitMQ 相关设置的同时,添加格式设置。
示例:
说明
SELECT 并不特别适合读取消息 (调试场景除外) ,因为每条消息只能读取一次。更实用的做法是使用 materialized views 创建实时处理链路。为此:
- 使用该引擎创建一个 RabbitMQ 消费者,并将其视为数据 stream。
- 创建一个具有所需结构的表。
- 创建一个 materialized view,将来自该引擎的数据转换后写入之前创建的表中。
MATERIALIZED VIEW 连接到该引擎时,它会在后台开始收集数据。这样你就可以持续从 RabbitMQ 接收消息,并使用 SELECT 将其转换为所需格式。
一个 RabbitMQ 表可以拥有任意多个 materialized views。
可以根据 rabbitmq_exchange_type 和指定的 rabbitmq_routing_key_list 对数据进行路由。
每个表最多只能有一个 exchange。一个 exchange 可以由多个表共享,这样就能同时路由到多个表。
exchange 类型选项:
direct- 路由基于 key 的精确匹配。示例表 key 列表:key1,key2,key3,key4,key5,消息 key 可以等于其中任意一个。fanout- 路由到所有表 (exchange 名称相同的表) ,与 key 无关。topic- 路由基于以点分隔的 key pattern。示例:*.logs、records.*.*.2020、*.2018,*.2019,*.2020。headers- 路由基于key=value匹配,并使用设置x-match=all或x-match=any。示例表 key 列表:x-match=all,format=logs,type=report,year=2020。consistent_hash- 数据会在所有已绑定的表之间均匀分布 (exchange 名称相同的表) 。请注意,此 exchange 类型必须通过 RabbitMQ plugin 启用:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange。
rabbitmq_queue_base 可用于以下场景:
- 让不同的表共享队列,以便为同一组队列注册多个消费者,从而获得更好的性能。如果使用
rabbitmq_num_consumers和/或rabbitmq_num_queues设置,那么在这些参数相同的情况下,可以实现队列的精确匹配。 - 在并非所有消息都被成功消费时,能够从某些持久化队列恢复读取。要从某个特定队列恢复消费,请在
rabbitmq_queue_base设置中指定其名称,并且不要指定rabbitmq_num_consumers和rabbitmq_num_queues(默认为 1) 。要从为特定表声明的所有队列恢复消费,只需指定相同的设置:rabbitmq_queue_base、rabbitmq_num_consumers、rabbitmq_num_queues。默认情况下,队列名称对各表都是唯一的。 - 复用队列,因为它们被声明为持久化且不会被自动删除。 (可通过任意 RabbitMQ CLI 工具删除。)
rabbitmq_exchange_type 的同时还指定了 rabbitmq_num_consumers 和/或 rabbitmq_num_queues 设置,那么:
- 必须启用
rabbitmq-consistent-hash-exchangeplugin。 - 必须指定已发布消息的
message_id属性 (每条消息/批次唯一) 。
messageID 和 republished 标志 (如果消息被发布超过一次,则为 true) ——可通过消息请求头访问。
不要将同一个表同时用于 inserts 和 materialized views。
示例:
虚拟列
_exchange_name- RabbitMQ exchange 名称。数据类型:String。_channel_id- 接收该消息的消费者所声明的 ChannelID。数据类型:String。_delivery_tag- 已接收消息的 DeliveryTag。每个 channel 内独立。数据类型:UInt64。_redelivered- 消息的redelivered标志。数据类型:UInt8。_message_id- 已接收消息的 messageID;如果在消息发布时设置了该值,则非空。数据类型:String。_timestamp- 已接收消息的时间戳;如果在消息发布时设置了该值,则非空。数据类型:UInt64。
rabbitmq_handle_error_mode='stream' 时,还会有以下附加虚拟列:
_raw_message- 无法成功解析的原始消息。数据类型:Nullable(String)。_error- 解析失败时产生的异常消息。数据类型:Nullable(String)。
_raw_message 和 _error 虚拟列;如果消息解析成功,它们始终为 NULL。
注意事项
DEFAULT、MATERIALIZED、ALIAS) ,这些设置也会被忽略。相反,各列会填充为其类型对应的默认值。
数据格式支持
- 对于按行组织的格式,可通过设置
rabbitmq_max_rows_per_message控制单条 RabbitMQ 消息中的行数。 - 对于按块组织的格式,我们无法将块进一步拆分为更小的部分,但单个块中的行数可以通过通用设置 max_block_size 控制。