メインコンテンツへスキップ
このエンジンを使用すると、ClickHouse を NATS と統合できます。 NATS では次のことができます。
  • メッセージ subject をパブリッシュまたはサブスクライブする。
  • 新しいメッセージが利用可能になり次第処理する。

テーブルの作成

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = NATS SETTINGS
    nats_url = 'host:port',
    nats_subjects = 'subject1,subject2,...',
    nats_format = 'data_format'[,]
    [nats_schema = '',]
    [nats_num_consumers = N,]
    [nats_queue_group = 'group_name',]
    [nats_secure = false,]
    [nats_max_reconnect = N,]
    [nats_reconnect_wait = N,]
    [nats_server_list = 'host1:port1,host2:port2,...',]
    [nats_skip_broken_messages = N,]
    [nats_max_block_size = N,]
    [nats_flush_interval_ms = N,]
    [nats_username = 'user',]
    [nats_password = 'password',]
    [nats_token = 'clickhouse',]
    [nats_credential_file = '/var/nats_credentials',]
    [nats_startup_connect_tries = '5']
    [nats_max_rows_per_message = 1,]
    [nats_handle_error_mode = 'default']
パラメータ:
  • nats_url – host:port (例: localhost:5672) ..
  • nats_subjects – NATS テーブルが subscribe/publish する subject の一覧。foo.*.barbaz.> のようなワイルドカード subject をサポートします
  • nats_format – メッセージのフォーマット。JSONEachRow など、SQL の FORMAT 関数と同じ記法を使用します。詳細は フォーマット セクションを参照してください。
パラメータ:
  • nats_schema – フォーマットでスキーマ定義が必要な場合に使用する必要があるパラメータです。たとえば、Cap’n Proto では、スキーマファイルへのパスとルート schema.capnp:Message オブジェクト名が必要です。
  • nats_stream – NATS JetStream 内の既存の stream 名。
  • nats_consumer – NATS JetStream 内の既存の durable pull コンシューマー名。
  • nats_num_consumers – テーブルごとのコンシューマー数。デフォルト: 1。NATS core のみを使用していて、1 つのコンシューマーのスループットが不十分な場合は、より多くのコンシューマーを指定します。
  • nats_queue_group – NATS subscriber の queue group 名。デフォルトはテーブル名です。
  • nats_max_reconnect – 非推奨であり、効果はありません。再接続は nats_reconnect_wait タイムアウトで恒久的に実行されます。
  • nats_reconnect_wait – 再接続試行のたびに待機する時間 (ミリ秒単位) 。デフォルト: 5000
  • nats_server_list - 接続先の server 一覧。NATS クラスターに接続するために指定できます。
  • nats_skip_broken_messages - ブロックごとに許容する、スキーマ非互換の NATS message の数。デフォルト: 0nats_skip_broken_messages = N の場合、このエンジンは解析できない N 件の NATS message をスキップします (1 message は 1 行のデータに相当します) 。
  • nats_max_block_size - NATS からデータを flush するために poll で収集する行数。デフォルト: max_insert_block_size
  • nats_flush_interval_ms - NATS から読み取ったデータを flush するまでのタイムアウト。デフォルト: stream_flush_interval_ms
  • nats_username - NATS username。
  • nats_password - NATS password。
  • nats_token - NATS auth token。
  • nats_credential_file - NATS credentials file への path。
  • nats_startup_connect_tries - 起動時の接続試行回数。デフォルト: 5
  • nats_max_rows_per_message — 行ベースのフォーマットで、1 つの NATS message に書き込まれる最大行数。 (デフォルト: 1) 。
  • nats_handle_error_mode — NATS エンジンでの error の処理方法。設定可能な値: default (message の解析に失敗すると exception が throw されます) 、stream (exception message と raw message が仮想カラム _error および _raw_message に保存されます) 。
SSL 接続: 安全な接続を使用するには、nats_secure = 1 を設定します。 証明書の検証は、CLICKHOUSE_NATS_TLS_SECURE 環境変数で制御されます。 証明書の有効期限が切れている場合、自己署名である場合、存在しない場合、またはその他の理由で無効な場合は、CLICKHOUSE_NATS_TLS_SECURE=0 を設定して検証を無効にします。 NATS table への書き込み: table が 1 つの subject だけを読み取る場合、insert はすべて同じ subject に公開されます。 ただし、table が複数の subject を読み取る場合は、どの subject に公開するかを指定する必要があります。 そのため、複数の subject を持つ table に insert する場合は、stream_like_engine_insert_queue の設定が必要です。 table が読み取る subject のいずれか 1 つを選び、そこにデータを公開できます。たとえば:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1,subject2',
             nats_format = 'JSONEachRow';

  INSERT INTO queue
  SETTINGS stream_like_engine_insert_queue = 'subject2'
  VALUES (1, 1);
nats関連の設定に加えて、フォーマットに関する設定を追加することもできます。 例:
  CREATE TABLE queue (
    key UInt64,
    value UInt64,
    date DateTime
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';
NATSサーバーの設定は、ClickHouseの設定ファイルを使用して追加できます。 具体的には、NATSエンジン用のパスワードを追加できます:
<nats>
    <user>click</user>
    <password>house</password>
    <token>clickhouse</token>
</nats>

説明

SELECT は、メッセージの読み取りにはあまり適していません (debugging 目的を除く) 。各メッセージは一度しか読み取れないためです。より実用的なのは、materialized view を使用してリアルタイムのスレッドを作成することです。これを行うには、次の手順に従います。
  1. engine を使用して NATS コンシューマーを作成し、それをデータストリームとして扱います。
  2. 必要な structure を持つ table を作成します。
  3. engine からのデータを変換し、あらかじめ作成した table に格納する materialized view を作成します。
MATERIALIZED VIEW を engine に接続すると、バックグラウンドでデータの収集を開始します。これにより、NATS からメッセージを継続的に受信し、SELECT を使って必要なフォーマットに変換できます。 1 つの NATS table には、必要な数だけ materialized view を作成できます。これらは table から直接データを読み取るのではなく、新しいレコードをブロック単位で受け取ります。そのため、詳細度の異なる複数の table に書き込むことができます (グループ化あり - aggregation、なし) 。 例:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree() ORDER BY key;

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;
ストリームデータの受信を停止するか、変換ロジックを変更するには、materialized viewをデタッチします:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
ALTER を使用してターゲットテーブルを変更する場合は、ターゲットテーブルとビューのデータの不整合を避けるため、materialized viewを無効化することを推奨します。

仮想カラム

  • _subject - NATS メッセージの subject。データ型: String
nats_handle_error_mode='stream' の場合は、次の仮想カラムも利用できます。
  • _raw_message - 正常にパースできなかった生のメッセージ。データ型: Nullable(String)
  • _error - パース失敗時に発生した例外メッセージ。データ型: Nullable(String)
注: 仮想カラム _raw_message_error に値が入るのは、パース中に例外が発生した場合のみです。メッセージが正常にパースされた場合、これらは常に NULL です。

データフォーマットのサポート

NATS エンジンは、ClickHouse でサポートされているすべてのフォーマットに対応しています。 1 つの NATS メッセージに含まれる行数は、そのフォーマットが行ベースかブロックベースかによって異なります。
  • 行ベースのフォーマットでは、1 つの NATS メッセージに含める行数を nats_max_rows_per_message の設定で制御できます。
  • ブロックベースのフォーマットでは、ブロックをより小さなパーツに分割することはできませんが、1 つのブロックに含まれる行数は一般設定の max_block_size で制御できます。

JetStream の使用

NATS JetStream で NATS エンジンを使用する前に、NATS の stream と durable pull コンシューマーを作成する必要があります。これには、たとえば NATS CLI パッケージの nats ユーティリティを使用できます。
$ nats stream add
? Stream Name stream_name
? Subjects stream_subject
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream stream_name was created

Information for Stream stream_name created 2025-10-03 14:12:51

                Subjects: stream_subject
                Replicas: 1
                 Storage: File

Options:

               Retention: Limits
         Acknowledgments: true
          Discard Policy: Old
        Duplicate Window: 2m0s
              Direct Get: true
       Allows Msg Delete: true
            Allows Purge: true
  Allows Per-Message TTL: false
          Allows Rollups: false

Limits:

        Maximum Messages: unlimited
     Maximum Per Subject: unlimited
           Maximum Bytes: unlimited
             Maximum Age: unlimited
    Maximum Message Size: unlimited
       Maximum Consumers: unlimited

State:

                Messages: 0
                   Bytes: 0 B
          First Sequence: 0
           Last Sequence: 0
        Active Consumers: 0
$ nats consumer add
? Select a Stream stream_name
? Consumer name consumer_name
? Delivery target (empty for Pull Consumers) 
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgment policy explicit
? Replay policy instant
? Filter Stream by subjects (blank for all) 
? Maximum Allowed Deliveries -1
? Maximum Acknowledgments Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
Information for Consumer stream_name > consumer_name created 2025-10-03T14:13:51+03:00

Configuration:

                    Name: consumer_name
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512
stream と durable pull コンシューマーを作成したら、NATS エンジンを使用してテーブルを作成できます。そのためには、nats&#95;streamnats&#95;consumer&#95;namenats&#95;subjects を設定する必要があります。
CREATE TABLE nats_jet_stream (
    key UInt64,
    value UInt64
  ) ENGINE NATS 
    SETTINGS  nats_url = 'localhost:4222',
              nats_stream = 'stream_name',
              nats_consumer_name = 'consumer_name',
              nats_subjects = 'stream_subject',
              nats_format = 'JSONEachRow';
最終更新日 2026年6月10日