メインコンテンツへスキップ
このエンジンを使用すると、ClickHouse を RabbitMQ と連携できます。 RabbitMQ では、次のことができます。
  • データフローをパブリッシュまたはサブスクライブする。
  • 利用可能になったストリームを処理する。

テーブルの作成

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1],
    name2 [type2],
    ...
) ENGINE = RabbitMQ SETTINGS
    rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'],
    rabbitmq_exchange_name = 'exchange_name',
    rabbitmq_format = 'data_format'[,]
    [rabbitmq_exchange_type = 'exchange_type',]
    [rabbitmq_routing_key_list = 'key1,key2,...',]
    [rabbitmq_secure = 0,]
    [rabbitmq_schema = '',]
    [rabbitmq_num_consumers = N,]
    [rabbitmq_num_queues = N,]
    [rabbitmq_queue_base = 'queue',]
    [rabbitmq_deadletter_exchange = 'dl-exchange',]
    [rabbitmq_persistent = 0,]
    [rabbitmq_skip_broken_messages = N,]
    [rabbitmq_max_block_size = N,]
    [rabbitmq_flush_interval_ms = N,]
    [rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish',]
    [rabbitmq_queue_consume = false,]
    [rabbitmq_address = '',]
    [rabbitmq_vhost = '/',]
    [rabbitmq_username = '',]
    [rabbitmq_password = '',]
    [rabbitmq_commit_on_select = false,]
    [rabbitmq_max_rows_per_message = 1,]
    [rabbitmq_handle_error_mode = 'default']
パラメータ:
  • rabbitmq_host_port – ホスト:ポート (例: localhost:5672) 。
  • rabbitmq_exchange_name – RabbitMQ exchange名。
  • rabbitmq_format – メッセージのフォーマット。JSONEachRow など、SQL FORMAT 関数と同じ記法を使用します。詳細は、フォーマット セクションを参照してください。
パラメータ:
  • rabbitmq_exchange_type – RabbitMQ exchange のタイプ: direct, fanout, topic, headers, consistent_hash。デフォルト: fanout
  • rabbitmq_routing_key_list – ルーティングキーのカンマ区切りリスト。
  • rabbitmq_schema – フォーマットでスキーマ定義が必要な場合に使用する必要があるパラメーターです。たとえば Cap’n Proto では、スキーマファイルへのパスと、ルート schema.capnp:Message オブジェクト名が必要です。
  • rabbitmq_num_consumers – テーブルごとのコンシューマー数。1 つのコンシューマーのスループットが不十分な場合は、より多くのコンシューマーを指定してください。デフォルト: 1
  • rabbitmq_num_queues – キューの総数。この数を増やすと、パフォーマンスが大幅に向上する場合があります。デフォルト: 1
  • rabbitmq_queue_base - キュー名のヒントを指定します。この設定のユースケースについては以下で説明します。
  • rabbitmq_persistent - 1 (true) に設定すると、INSERT クエリの配信モードは 2 に設定されます (メッセージを「永続化」としてマークします) 。デフォルト: 0
  • rabbitmq_skip_broken_messages – ブロックごとに、スキーマと互換性のない RabbitMQ メッセージをメッセージパーサーがどれだけ許容するかを指定します。rabbitmq_skip_broken_messages = N の場合、このエンジンはパースできない N 件の RabbitMQ メッセージをスキップします (1 メッセージは 1 行のデータに相当します) 。デフォルト: 0
  • rabbitmq_max_block_size - RabbitMQ からデータをフラッシュする前に収集する行数。デフォルト: max_insert_block_size
  • rabbitmq_flush_interval_ms - RabbitMQ からデータをフラッシュするまでのタイムアウト。デフォルト: stream_flush_interval_ms
  • rabbitmq_queue_settings_list - キューの作成時に RabbitMQ の設定を指定できます。使用可能な設定: x-max-length, x-max-length-bytes, x-message-ttl, x-expires, x-priority, x-max-priority, x-overflow, x-dead-letter-exchange, x-queue-typedurable 設定はキューに対して自動的に有効になります。
  • rabbitmq_address - 接続先アドレス。この設定または rabbitmq_host_port のいずれかを使用してください。
  • rabbitmq_vhost - RabbitMQ vhost。デフォルト: '\'
  • rabbitmq_queue_consume - ユーザー定義キューを使用し、exchange、queue、binding の宣言を含む RabbitMQ のセットアップは行いません。デフォルト: false
  • rabbitmq_username - RabbitMQ のユーザー名。
  • rabbitmq_password - RabbitMQ のパスワード。
  • reject_unhandled_messages - エラー時にメッセージを拒否します (RabbitMQ に negative acknowledgement を送信します) 。この設定は、rabbitmq_queue_settings_listx-dead-letter-exchange が定義されている場合、自動的に有効になります。
  • rabbitmq_commit_on_select - select クエリ実行時にメッセージを commit します。デフォルト: false
  • rabbitmq_max_rows_per_message — 行ベースのフォーマットで 1 つの RabbitMQ メッセージに書き込まれる最大行数。デフォルト: 1
  • rabbitmq_empty_queue_backoff_start_ms — RabbitMQ キューが空の場合に読み取りを再スケジュールするための backoff の開始点。
  • rabbitmq_empty_queue_backoff_end_ms — RabbitMQ キューが空の場合に読み取りを再スケジュールするための backoff の終了点。
  • rabbitmq_empty_queue_backoff_step_ms — RabbitMQ キューが空の場合に読み取りを再スケジュールするための backoff のステップ。
  • rabbitmq_handle_error_mode — RabbitMQ エンジンでの error の処理方法。設定可能な値: default (メッセージのパースに失敗した場合は例外がスローされます) 、stream (例外メッセージと生メッセージは仮想カラム _error および _raw_message に保存されます) 、dead_letter_queue (error 関連のデータは system.dead_letter_queue に保存されます) 。

SSL 接続

接続アドレスには、rabbitmq_secure = 1 または amqps のいずれかを使用します: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'。 使用しているライブラリのデフォルトの動作では、作成された TLS 接続が十分に安全かどうかは確認されません。証明書が期限切れ、自己署名、欠落、無効のいずれであっても、接続はそのまま許可されます。より厳格な証明書検証は、今後実装される可能性があります。 また、rabbitmq 関連の設定に加えてフォーマット設定を追加することもできます。 例:
  CREATE TABLE queue (
    key UInt64,
    value UInt64,
    date DateTime
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5,
                            date_time_input_format = 'best_effort';
RabbitMQ のサーバー設定は、ClickHouse の設定ファイルに追加する必要があります。 必要な設定:
 <rabbitmq>
    <username>root</username>
    <password>clickhouse</password>
 </rabbitmq>
追加設定:
 <rabbitmq>
    <vhost>clickhouse</vhost>
 </rabbitmq>

説明

メッセージの読み取りに SELECT はあまり有用ではありません (デバッグ時を除く) 。これは、各メッセージを読み取れるのが 1 回限りだからです。より実用的なのは、materialized views を使ってリアルタイムのスレッドを作成することです。これを行うには、次の手順に従います。
  1. エンジンを使って RabbitMQ コンシューマーを作成し、それをデータストリームとして扱います。
  2. 必要な構造を持つテーブルを作成します。
  3. エンジンからのデータを変換し、あらかじめ作成したテーブルに格納する materialized view を作成します。
MATERIALIZED VIEW がエンジンに接続されると、バックグラウンドでデータの収集を開始します。これにより、RabbitMQ からメッセージを継続的に受信し、SELECT を使って必要なフォーマットに変換できます。 1 つの RabbitMQ テーブルには、必要な数の materialized view を作成できます。 データは rabbitmq_exchange_type と指定した rabbitmq_routing_key_list に基づいて振り分けることができます。 1 つのテーブルに対して exchange は 1 つまでです。1 つの exchange を複数のテーブルで共有することもでき、その場合は複数のテーブルへ同時にルーティングできます。 Exchange type のオプション:
  • direct - ルーティングはキーの完全一致に基づきます。テーブルのキーリストの例: key1,key2,key3,key4,key5。メッセージのキーはこのいずれかと一致できます。
  • fanout - キーに関係なく、すべてのテーブル (exchange 名が同じもの) へルーティングします。
  • topic - ルーティングはドット区切りのキーを使ったパターンに基づきます。例: *.logs, records.*.*.2020, *.2018,*.2019,*.2020
  • headers - ルーティングは key=value の一致に基づき、設定 x-match=all または x-match=any を使用します。テーブルのキーリストの例: x-match=all,format=logs,type=report,year=2020
  • consistent_hash - データはバインドされたすべてのテーブル (exchange 名が同じもの) に均等に分散されます。この exchange type を使うには、RabbitMQ プラグインを有効にする必要がある点に注意してください: rabbitmq-plugins enable rabbitmq_consistent_hash_exchange
設定 rabbitmq_queue_base は、次のケースで使用できます。
  • 複数のテーブルでキューを共有し、同じキューに対して複数のコンシューマーを登録できるようにするためです。これにより、パフォーマンスが向上します。rabbitmq_num_consumers および/または rabbitmq_num_queues 設定を使う場合、これらのパラメーターが同じであればキューを完全に一致させられます。
  • すべてのメッセージを正常に消費できなかった場合に、特定の durable キューからの読み取りを復旧できるようにするためです。特定の 1 つのキューから消費を再開するには、rabbitmq_queue_base 設定にその名前を設定し、rabbitmq_num_consumersrabbitmq_num_queues は指定しないでください (デフォルトは 1) 。特定のテーブル用に宣言されたすべてのキューから消費を再開するには、同じ設定、つまり rabbitmq_queue_baserabbitmq_num_consumersrabbitmq_num_queues を指定してください。デフォルトでは、キュー名はテーブルごとに一意になります。
  • キューは durable で自動削除されないため、それらを再利用するためです。 (削除する場合は RabbitMQ CLI ツールのいずれかを使用できます。)
パフォーマンスを向上させるため、受信したメッセージは max_insert_block_size のサイズの ブロック にまとめられます。ブロック が stream_flush_interval_ms ミリ秒以内に形成されなかった場合、ブロック が完全でなくてもデータはテーブルにフラッシュされます。 rabbitmq_num_consumers および/または rabbitmq_num_queues 設定を rabbitmq_exchange_type とともに指定する場合は、次の条件があります。
  • rabbitmq-consistent-hash-exchange プラグインを有効にする必要があります。
  • 公開されるメッセージの message_id プロパティを指定する必要があります (各メッセージ/バッチごとに一意) 。
INSERT クエリ にはメッセージメタデータがあり、公開された各メッセージに対して messageIDrepublished フラグ (複数回公開された場合は true) が追加されます。これらにはメッセージヘッダー経由でアクセスできます。 INSERT と materialized view に同じテーブルを使用しないでください。 例:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_exchange_type = 'headers',
                            rabbitmq_routing_key_list = 'format=logs,type=report,year=2020',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5;

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree() ORDER BY key;

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;

仮想カラム

  • _exchange_name - RabbitMQ exchange名。データ型: String
  • _channel_id - メッセージを受信した コンシューマー が宣言された ChannelID。データ型: String
  • _delivery_tag - 受信したメッセージの DeliveryTag。チャネルごとのスコープです。データ型: UInt64
  • _redelivered - メッセージの redelivered フラグ。データ型: UInt8
  • _message_id - 受信したメッセージの messageID。メッセージの公開時に設定されていた場合は空ではありません。データ型: String
  • _timestamp - 受信したメッセージの timestamp。メッセージの公開時に設定されていた場合は空ではありません。データ型: UInt64
rabbitmq_handle_error_mode='stream' の場合に追加される仮想カラム:
  • _raw_message - 正常にパースできなかった生メッセージ。データ型: Nullable(String)
  • _error - パース失敗時に発生した例外メッセージ。データ型: Nullable(String)
注: _raw_message_error の仮想カラムに値が設定されるのは、パース中に例外が発生した場合のみです。メッセージが正常にパースされた場合、これらは常に NULL です。

注意事項

デフォルトカラム式 (DEFAULTMATERIALIZEDALIAS など) をテーブル定義で指定していても、これらは無視されます。代わりに、各カラムにはその型に応じたデフォルト値が設定されます。

データフォーマットのサポート

RabbitMQエンジンは、ClickHouseでサポートされているすべてのフォーマットに対応しています。 1 つの RabbitMQ メッセージに含められる行数は、フォーマットが行ベースかブロックベースかによって異なります。
  • 行ベースのフォーマットでは、1 つの RabbitMQ メッセージに含める行数は、rabbitmq_max_rows_per_message の設定で制御できます。
  • ブロックベースのフォーマットでは、ブロックをさらに小さなパーツに分割することはできませんが、1 つのブロックに含める行数は、一般設定 max_block_size で制御できます。
最終更新日 2026年6月10日