跳转到主要内容
该引擎支持将 ClickHouse 与 NATS 集成。 NATS 可让你:
  • 发布或订阅消息 subject。
  • 在有新消息可用时进行处理。

创建表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = NATS SETTINGS
    nats_url = 'host:port',
    nats_subjects = 'subject1,subject2,...',
    nats_format = 'data_format'[,]
    [nats_schema = '',]
    [nats_num_consumers = N,]
    [nats_queue_group = 'group_name',]
    [nats_secure = false,]
    [nats_max_reconnect = N,]
    [nats_reconnect_wait = N,]
    [nats_server_list = 'host1:port1,host2:port2,...',]
    [nats_skip_broken_messages = N,]
    [nats_max_block_size = N,]
    [nats_flush_interval_ms = N,]
    [nats_username = 'user',]
    [nats_password = 'password',]
    [nats_token = 'clickhouse',]
    [nats_credential_file = '/var/nats_credentials',]
    [nats_startup_connect_tries = '5']
    [nats_max_rows_per_message = 1,]
    [nats_handle_error_mode = 'default']
必需参数:
  • nats_url – host:port (例如 localhost:5672) 。
  • nats_subjects – NATS 表要订阅/发布的 subject 列表。支持通配符 subject,例如 foo.*.barbaz.>
  • nats_format – 消息格式。使用与 SQL FORMAT 函数相同的表示法,例如 JSONEachRow。更多信息,请参见Formats部分。
可选参数:
  • nats_schema – 如果格式需要 schema 定义,则必须使用此参数。例如,Cap’n Proto 需要提供 schema 文件的 path 以及根对象 schema.capnp:Message 的名称。
  • nats_stream – NATS JetStream 中现有 stream 的名称。
  • nats_consumer – NATS JetStream 中现有持久 pull 消费者的名称。
  • nats_num_consumers – 每个表的消费者数量。默认值:1。仅适用于 NATS core:如果单个消费者的吞吐量不足,可指定更多消费者。
  • nats_queue_group – NATS 订阅者的 queue group 名称。默认值为表名。
  • nats_max_reconnect – 已弃用且不起作用;系统会按 nats_reconnect_wait timeout 永久执行重连。
  • nats_reconnect_wait – 每次重连尝试之间的休眠时间 (毫秒) 。默认值:5000
  • nats_server_list - 用于 connection 的 server 列表。可用于连接到 NATS cluster。
  • nats_skip_broken_messages - NATS 消息解析器对每个块中与 schema 不兼容消息的容忍数量。默认值:0。如果 nats_skip_broken_messages = N,则该引擎会跳过 N 条无法解析的 NATS 消息 (1 条消息等于 1 行数据) 。
  • nats_max_block_size - 为从 NATS flush 数据而通过 poll 收集的行数。默认值:max_insert_block_size
  • nats_flush_interval_ms - flush 从 NATS 读取的数据的 timeout。默认值:stream_flush_interval_ms
  • nats_username - NATS 用户名。
  • nats_password - NATS 密码。
  • nats_token - NATS 认证标记。
  • nats_credential_file - NATS 凭据文件的 path。
  • nats_startup_connect_tries - 启动时的连接尝试次数。默认值:5
  • nats_max_rows_per_message — 对于基于行的格式,一条 NATS 消息中写入的最大行数。默认值:1
  • nats_handle_error_mode — NATS 引擎的错误处理方式。可选值:default (如果消息解析失败,则抛出异常) ;stream (异常消息和原始消息将保存在虚拟列 _error_raw_message 中) 。
SSL 连接: 如需使用安全连接,请设置 nats_secure = 1。 证书验证由 CLICKHOUSE_NATS_TLS_SECURE 环境变量控制; 如果证书已过期、为自签名证书、缺失或因其他原因无效,请将 CLICKHOUSE_NATS_TLS_SECURE=0 以禁用验证。 写入 NATS 表: 如果表仅读取一个 subject,任何 insert 都会发布到同一个 subject。 但是,如果表从多个 subject 读取,则需要指定要发布到哪个 subject。 因此,向具有多个 subject 的表执行 insert 时,需要设置 stream_like_engine_insert_queue。 你可以选择该表读取的其中一个 subject,并将数据发布到该 subject。例如:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1,subject2',
             nats_format = 'JSONEachRow';

  INSERT INTO queue
  SETTINGS stream_like_engine_insert_queue = 'subject2'
  VALUES (1, 1);
也可以在添加与 nats 相关的设置时,一并添加 format 设置。 示例:
  CREATE TABLE queue (
    key UInt64,
    value UInt64,
    date DateTime
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';
可以通过 ClickHouse 配置文件添加 NATS 服务器配置。 更具体地说,你可以为 NATS 引擎添加密码:
<nats>
    <user>click</user>
    <password>house</password>
    <token>clickhouse</token>
</nats>

说明

SELECT 并不特别适合用于读取消息 (调试除外) ,因为每条消息只能读取一次。更实用的做法是使用 materialized views 创建实时处理链路。为此:
  1. 使用该引擎创建一个 NATS 消费者,并将其视为数据 stream。
  2. 创建一个具有所需结构的表。
  3. 创建一个 materialized view,将该引擎中的数据转换后写入前面创建的表中。
MATERIALIZED VIEW 连接到该引擎后,就会开始在后台收集数据。这样一来,你就可以持续接收来自 NATS 的消息,并使用 SELECT 将其转换为所需格式。 一个 NATS 表可以拥有任意数量的 materialized view;它们不会直接从该表读取数据,而是接收新的记录 (以块的形式) ,因此你可以写入多个明细粒度不同的表 (带分组聚合和不带分组聚合) 。 示例:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree() ORDER BY key;

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;
若要停止接收流数据或更改转换逻辑,请分离 materialized view:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
如果你想使用 ALTER 修改目标表,我们建议先禁用物化视图,以避免目标表与视图数据之间出现不一致。

虚拟列

  • _subject - NATS 消息的 subject。数据类型:String
nats_handle_error_mode='stream' 时,会提供以下额外的虚拟列:
  • _raw_message - 无法成功解析的原始消息。数据类型:Nullable(String)
  • _error - 解析失败时产生的异常消息。数据类型:Nullable(String)
注意:_raw_message_error 这两个虚拟列仅会在解析过程中发生异常时填充;如果消息解析成功,它们始终为 NULL

数据格式支持

NATS 引擎 支持 ClickHouse 支持的所有格式。 单条 NATS 消息中的行数取决于格式是按行还是按块:
  • 对于按行的格式,可通过设置 nats_max_rows_per_message 来控制单条 NATS 消息中的行数。
  • 对于按块的格式,无法将块拆分成更小的部分,但一个块中的行数可通过通用设置 max_block_size 控制。

使用 JetStream

在结合 NATS JetStream 使用 NATS 引擎之前,您必须先创建一个 NATS stream 和一个持久化拉取消费者。为此,您可以使用 NATS CLI 包中的 nats 工具,例如:
$ nats stream add
? Stream Name stream_name
? Subjects stream_subject
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream stream_name was created

Information for Stream stream_name created 2025-10-03 14:12:51

                Subjects: stream_subject
                Replicas: 1
                 Storage: File

Options:

               Retention: Limits
         Acknowledgments: true
          Discard Policy: Old
        Duplicate Window: 2m0s
              Direct Get: true
       Allows Msg Delete: true
            Allows Purge: true
  Allows Per-Message TTL: false
          Allows Rollups: false

Limits:

        Maximum Messages: unlimited
     Maximum Per Subject: unlimited
           Maximum Bytes: unlimited
             Maximum Age: unlimited
    Maximum Message Size: unlimited
       Maximum Consumers: unlimited

State:

                Messages: 0
                   Bytes: 0 B
          First Sequence: 0
           Last Sequence: 0
        Active Consumers: 0
$ nats consumer add
? Select a Stream stream_name
? Consumer name consumer_name
? Delivery target (empty for Pull Consumers) 
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgment policy explicit
? Replay policy instant
? Filter Stream by subjects (blank for all) 
? Maximum Allowed Deliveries -1
? Maximum Acknowledgments Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
Information for Consumer stream_name > consumer_name created 2025-10-03T14:13:51+03:00

Configuration:

                    Name: consumer_name
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512
创建好 stream 和持久化拉取消费者后,我们就可以创建一个使用 NATS 引擎的表。为此,您需要设置:nats_stream、nats_consumer_name 和 nats_subjects:
CREATE TABLE nats_jet_stream (
    key UInt64,
    value UInt64
  ) ENGINE NATS 
    SETTINGS  nats_url = 'localhost:4222',
              nats_stream = 'stream_name',
              nats_consumer_name = 'consumer_name',
              nats_subjects = 'stream_subject',
              nats_format = 'JSONEachRow';
最后修改于 2026年6月10日