NATS では次のことができます。
- メッセージ subject をパブリッシュまたはサブスクライブする。
- 新しいメッセージが利用可能になり次第処理する。
テーブルの作成
nats_url– host:port (例:localhost:5672) ..nats_subjects– NATS テーブルが subscribe/publish する subject の一覧。foo.*.barやbaz.>のようなワイルドカード subject をサポートしますnats_format– メッセージのフォーマット。JSONEachRowなど、SQL のFORMAT関数と同じ記法を使用します。詳細は フォーマット セクションを参照してください。
nats_schema– フォーマットでスキーマ定義が必要な場合に使用する必要があるパラメータです。たとえば、Cap’n Proto では、スキーマファイルへのパスとルートschema.capnp:Messageオブジェクト名が必要です。nats_stream– NATS JetStream 内の既存の stream 名。nats_consumer– NATS JetStream 内の既存の durable pull コンシューマー名。nats_num_consumers– テーブルごとのコンシューマー数。デフォルト:1。NATS core のみを使用していて、1 つのコンシューマーのスループットが不十分な場合は、より多くのコンシューマーを指定します。nats_queue_group– NATS subscriber の queue group 名。デフォルトはテーブル名です。nats_max_reconnect– 非推奨であり、効果はありません。再接続はnats_reconnect_waitタイムアウトで恒久的に実行されます。nats_reconnect_wait– 再接続試行のたびに待機する時間 (ミリ秒単位) 。デフォルト:5000。nats_server_list- 接続先の server 一覧。NATS クラスターに接続するために指定できます。nats_skip_broken_messages- ブロックごとに許容する、スキーマ非互換の NATS message の数。デフォルト:0。nats_skip_broken_messages = Nの場合、このエンジンは解析できない N 件の NATS message をスキップします (1 message は 1 行のデータに相当します) 。nats_max_block_size- NATS からデータを flush するために poll で収集する行数。デフォルト: max_insert_block_size。nats_flush_interval_ms- NATS から読み取ったデータを flush するまでのタイムアウト。デフォルト: stream_flush_interval_ms。nats_username- NATS username。nats_password- NATS password。nats_token- NATS auth token。nats_credential_file- NATS credentials file への path。nats_startup_connect_tries- 起動時の接続試行回数。デフォルト:5。nats_max_rows_per_message— 行ベースのフォーマットで、1 つの NATS message に書き込まれる最大行数。 (デフォルト:1) 。nats_handle_error_mode— NATS エンジンでの error の処理方法。設定可能な値: default (message の解析に失敗すると exception が throw されます) 、stream (exception message と raw message が仮想カラム_errorおよび_raw_messageに保存されます) 。
nats_secure = 1 を設定します。
証明書の検証は、CLICKHOUSE_NATS_TLS_SECURE 環境変数で制御されます。
証明書の有効期限が切れている場合、自己署名である場合、存在しない場合、またはその他の理由で無効な場合は、CLICKHOUSE_NATS_TLS_SECURE=0 を設定して検証を無効にします。
NATS table への書き込み:
table が 1 つの subject だけを読み取る場合、insert はすべて同じ subject に公開されます。
ただし、table が複数の subject を読み取る場合は、どの subject に公開するかを指定する必要があります。
そのため、複数の subject を持つ table に insert する場合は、stream_like_engine_insert_queue の設定が必要です。
table が読み取る subject のいずれか 1 つを選び、そこにデータを公開できます。たとえば:
説明
SELECT は、メッセージの読み取りにはあまり適していません (debugging 目的を除く) 。各メッセージは一度しか読み取れないためです。より実用的なのは、materialized view を使用してリアルタイムのスレッドを作成することです。これを行うには、次の手順に従います。
- engine を使用して NATS コンシューマーを作成し、それをデータストリームとして扱います。
- 必要な structure を持つ table を作成します。
- engine からのデータを変換し、あらかじめ作成した table に格納する materialized view を作成します。
MATERIALIZED VIEW を engine に接続すると、バックグラウンドでデータの収集を開始します。これにより、NATS からメッセージを継続的に受信し、SELECT を使って必要なフォーマットに変換できます。
1 つの NATS table には、必要な数だけ materialized view を作成できます。これらは table から直接データを読み取るのではなく、新しいレコードをブロック単位で受け取ります。そのため、詳細度の異なる複数の table に書き込むことができます (グループ化あり - aggregation、なし) 。
例:
ALTER を使用してターゲットテーブルを変更する場合は、ターゲットテーブルとビューのデータの不整合を避けるため、materialized viewを無効化することを推奨します。
仮想カラム
_subject- NATS メッセージの subject。データ型:String。
nats_handle_error_mode='stream' の場合は、次の仮想カラムも利用できます。
_raw_message- 正常にパースできなかった生のメッセージ。データ型:Nullable(String)。_error- パース失敗時に発生した例外メッセージ。データ型:Nullable(String)。
_raw_message と _error に値が入るのは、パース中に例外が発生した場合のみです。メッセージが正常にパースされた場合、これらは常に NULL です。
データフォーマットのサポート
- 行ベースのフォーマットでは、1 つの NATS メッセージに含める行数を
nats_max_rows_per_messageの設定で制御できます。 - ブロックベースのフォーマットでは、ブロックをより小さなパーツに分割することはできませんが、1 つのブロックに含まれる行数は一般設定の max_block_size で制御できます。
JetStream の使用
nats ユーティリティを使用できます。
stream の作成
stream の作成
durable pull コンシューマーの作成
durable pull コンシューマーの作成
nats_stream、nats_consumer_name、nats_subjects を設定する必要があります。