NATS 可让你:
- 发布或订阅消息 subject。
- 在有新消息可用时进行处理。
创建表
nats_url– host:port (例如localhost:5672) 。nats_subjects– NATS 表要订阅/发布的 subject 列表。支持通配符 subject,例如foo.*.bar或baz.>nats_format– 消息格式。使用与 SQLFORMAT函数相同的表示法,例如JSONEachRow。更多信息,请参见Formats部分。
nats_schema– 如果格式需要 schema 定义,则必须使用此参数。例如,Cap’n Proto 需要提供 schema 文件的 path 以及根对象schema.capnp:Message的名称。nats_stream– NATS JetStream 中现有 stream 的名称。nats_consumer– NATS JetStream 中现有持久 pull 消费者的名称。nats_num_consumers– 每个表的消费者数量。默认值:1。仅适用于 NATS core:如果单个消费者的吞吐量不足,可指定更多消费者。nats_queue_group– NATS 订阅者的 queue group 名称。默认值为表名。nats_max_reconnect– 已弃用且不起作用;系统会按nats_reconnect_waittimeout 永久执行重连。nats_reconnect_wait– 每次重连尝试之间的休眠时间 (毫秒) 。默认值:5000。nats_server_list- 用于 connection 的 server 列表。可用于连接到 NATS cluster。nats_skip_broken_messages- NATS 消息解析器对每个块中与 schema 不兼容消息的容忍数量。默认值:0。如果nats_skip_broken_messages = N,则该引擎会跳过 N 条无法解析的 NATS 消息 (1 条消息等于 1 行数据) 。nats_max_block_size- 为从 NATS flush 数据而通过 poll 收集的行数。默认值:max_insert_block_size。nats_flush_interval_ms- flush 从 NATS 读取的数据的 timeout。默认值:stream_flush_interval_ms。nats_username- NATS 用户名。nats_password- NATS 密码。nats_token- NATS 认证标记。nats_credential_file- NATS 凭据文件的 path。nats_startup_connect_tries- 启动时的连接尝试次数。默认值:5。nats_max_rows_per_message— 对于基于行的格式,一条 NATS 消息中写入的最大行数。默认值:1。nats_handle_error_mode— NATS 引擎的错误处理方式。可选值:default (如果消息解析失败,则抛出异常) ;stream (异常消息和原始消息将保存在虚拟列_error和_raw_message中) 。
nats_secure = 1。
证书验证由 CLICKHOUSE_NATS_TLS_SECURE 环境变量控制;
如果证书已过期、为自签名证书、缺失或因其他原因无效,请将 CLICKHOUSE_NATS_TLS_SECURE=0 以禁用验证。
写入 NATS 表:
如果表仅读取一个 subject,任何 insert 都会发布到同一个 subject。
但是,如果表从多个 subject 读取,则需要指定要发布到哪个 subject。
因此,向具有多个 subject 的表执行 insert 时,需要设置 stream_like_engine_insert_queue。
你可以选择该表读取的其中一个 subject,并将数据发布到该 subject。例如:
说明
SELECT 并不特别适合用于读取消息 (调试除外) ,因为每条消息只能读取一次。更实用的做法是使用 materialized views 创建实时处理链路。为此:
- 使用该引擎创建一个 NATS 消费者,并将其视为数据 stream。
- 创建一个具有所需结构的表。
- 创建一个 materialized view,将该引擎中的数据转换后写入前面创建的表中。
MATERIALIZED VIEW 连接到该引擎后,就会开始在后台收集数据。这样一来,你就可以持续接收来自 NATS 的消息,并使用 SELECT 将其转换为所需格式。
一个 NATS 表可以拥有任意数量的 materialized view;它们不会直接从该表读取数据,而是接收新的记录 (以块的形式) ,因此你可以写入多个明细粒度不同的表 (带分组聚合和不带分组聚合) 。
示例:
ALTER 修改目标表,我们建议先禁用物化视图,以避免目标表与视图数据之间出现不一致。
虚拟列
_subject- NATS 消息的 subject。数据类型:String。
nats_handle_error_mode='stream' 时,会提供以下额外的虚拟列:
_raw_message- 无法成功解析的原始消息。数据类型:Nullable(String)。_error- 解析失败时产生的异常消息。数据类型:Nullable(String)。
_raw_message 和 _error 这两个虚拟列仅会在解析过程中发生异常时填充;如果消息解析成功,它们始终为 NULL。
数据格式支持
- 对于按行的格式,可通过设置
nats_max_rows_per_message来控制单条 NATS 消息中的行数。 - 对于按块的格式,无法将块拆分成更小的部分,但一个块中的行数可通过通用设置 max_block_size 控制。
使用 JetStream
nats 工具,例如:
创建 stream
创建 stream
创建持久化拉取消费者
创建持久化拉取消费者