RabbitMQ では、次のことができます。
- データフローをパブリッシュまたはサブスクライブする。
- 利用可能になったストリームを処理する。
テーブルの作成
rabbitmq_host_port– ホスト:ポート (例:localhost:5672) 。rabbitmq_exchange_name– RabbitMQ exchange名。rabbitmq_format– メッセージのフォーマット。JSONEachRowなど、SQLFORMAT関数と同じ記法を使用します。詳細は、フォーマット セクションを参照してください。
rabbitmq_exchange_type– RabbitMQ exchange のタイプ:direct,fanout,topic,headers,consistent_hash。デフォルト:fanout。rabbitmq_routing_key_list– ルーティングキーのカンマ区切りリスト。rabbitmq_schema– フォーマットでスキーマ定義が必要な場合に使用する必要があるパラメーターです。たとえば Cap’n Proto では、スキーマファイルへのパスと、ルートschema.capnp:Messageオブジェクト名が必要です。rabbitmq_num_consumers– テーブルごとのコンシューマー数。1 つのコンシューマーのスループットが不十分な場合は、より多くのコンシューマーを指定してください。デフォルト:1rabbitmq_num_queues– キューの総数。この数を増やすと、パフォーマンスが大幅に向上する場合があります。デフォルト:1。rabbitmq_queue_base- キュー名のヒントを指定します。この設定のユースケースについては以下で説明します。rabbitmq_persistent- 1 (true) に設定すると、INSERT クエリの配信モードは 2 に設定されます (メッセージを「永続化」としてマークします) 。デフォルト:0。rabbitmq_skip_broken_messages– ブロックごとに、スキーマと互換性のない RabbitMQ メッセージをメッセージパーサーがどれだけ許容するかを指定します。rabbitmq_skip_broken_messages = Nの場合、このエンジンはパースできない N 件の RabbitMQ メッセージをスキップします (1 メッセージは 1 行のデータに相当します) 。デフォルト:0。rabbitmq_max_block_size- RabbitMQ からデータをフラッシュする前に収集する行数。デフォルト: max_insert_block_size。rabbitmq_flush_interval_ms- RabbitMQ からデータをフラッシュするまでのタイムアウト。デフォルト: stream_flush_interval_ms。rabbitmq_queue_settings_list- キューの作成時に RabbitMQ の設定を指定できます。使用可能な設定:x-max-length,x-max-length-bytes,x-message-ttl,x-expires,x-priority,x-max-priority,x-overflow,x-dead-letter-exchange,x-queue-type。durable設定はキューに対して自動的に有効になります。rabbitmq_address- 接続先アドレス。この設定またはrabbitmq_host_portのいずれかを使用してください。rabbitmq_vhost- RabbitMQ vhost。デフォルト:'\'。rabbitmq_queue_consume- ユーザー定義キューを使用し、exchange、queue、binding の宣言を含む RabbitMQ のセットアップは行いません。デフォルト:false。rabbitmq_username- RabbitMQ のユーザー名。rabbitmq_password- RabbitMQ のパスワード。reject_unhandled_messages- エラー時にメッセージを拒否します (RabbitMQ に negative acknowledgement を送信します) 。この設定は、rabbitmq_queue_settings_listにx-dead-letter-exchangeが定義されている場合、自動的に有効になります。rabbitmq_commit_on_select- select クエリ実行時にメッセージを commit します。デフォルト:false。rabbitmq_max_rows_per_message— 行ベースのフォーマットで 1 つの RabbitMQ メッセージに書き込まれる最大行数。デフォルト:1。rabbitmq_empty_queue_backoff_start_ms— RabbitMQ キューが空の場合に読み取りを再スケジュールするための backoff の開始点。rabbitmq_empty_queue_backoff_end_ms— RabbitMQ キューが空の場合に読み取りを再スケジュールするための backoff の終了点。rabbitmq_empty_queue_backoff_step_ms— RabbitMQ キューが空の場合に読み取りを再スケジュールするための backoff のステップ。rabbitmq_handle_error_mode— RabbitMQ エンジンでの error の処理方法。設定可能な値: default (メッセージのパースに失敗した場合は例外がスローされます) 、stream (例外メッセージと生メッセージは仮想カラム_errorおよび_raw_messageに保存されます) 、dead_letter_queue (error 関連のデータは system.dead_letter_queue に保存されます) 。
SSL 接続
rabbitmq_secure = 1 または amqps のいずれかを使用します: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'。
使用しているライブラリのデフォルトの動作では、作成された TLS 接続が十分に安全かどうかは確認されません。証明書が期限切れ、自己署名、欠落、無効のいずれであっても、接続はそのまま許可されます。より厳格な証明書検証は、今後実装される可能性があります。
また、rabbitmq 関連の設定に加えてフォーマット設定を追加することもできます。
例:
説明
SELECT はあまり有用ではありません (デバッグ時を除く) 。これは、各メッセージを読み取れるのが 1 回限りだからです。より実用的なのは、materialized views を使ってリアルタイムのスレッドを作成することです。これを行うには、次の手順に従います。
- エンジンを使って RabbitMQ コンシューマーを作成し、それをデータストリームとして扱います。
- 必要な構造を持つテーブルを作成します。
- エンジンからのデータを変換し、あらかじめ作成したテーブルに格納する materialized view を作成します。
MATERIALIZED VIEW がエンジンに接続されると、バックグラウンドでデータの収集を開始します。これにより、RabbitMQ からメッセージを継続的に受信し、SELECT を使って必要なフォーマットに変換できます。
1 つの RabbitMQ テーブルには、必要な数の materialized view を作成できます。
データは rabbitmq_exchange_type と指定した rabbitmq_routing_key_list に基づいて振り分けることができます。
1 つのテーブルに対して exchange は 1 つまでです。1 つの exchange を複数のテーブルで共有することもでき、その場合は複数のテーブルへ同時にルーティングできます。
Exchange type のオプション:
direct- ルーティングはキーの完全一致に基づきます。テーブルのキーリストの例:key1,key2,key3,key4,key5。メッセージのキーはこのいずれかと一致できます。fanout- キーに関係なく、すべてのテーブル (exchange 名が同じもの) へルーティングします。topic- ルーティングはドット区切りのキーを使ったパターンに基づきます。例:*.logs,records.*.*.2020,*.2018,*.2019,*.2020。headers- ルーティングはkey=valueの一致に基づき、設定x-match=allまたはx-match=anyを使用します。テーブルのキーリストの例:x-match=all,format=logs,type=report,year=2020。consistent_hash- データはバインドされたすべてのテーブル (exchange 名が同じもの) に均等に分散されます。この exchange type を使うには、RabbitMQ プラグインを有効にする必要がある点に注意してください:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange。
rabbitmq_queue_base は、次のケースで使用できます。
- 複数のテーブルでキューを共有し、同じキューに対して複数のコンシューマーを登録できるようにするためです。これにより、パフォーマンスが向上します。
rabbitmq_num_consumersおよび/またはrabbitmq_num_queues設定を使う場合、これらのパラメーターが同じであればキューを完全に一致させられます。 - すべてのメッセージを正常に消費できなかった場合に、特定の durable キューからの読み取りを復旧できるようにするためです。特定の 1 つのキューから消費を再開するには、
rabbitmq_queue_base設定にその名前を設定し、rabbitmq_num_consumersとrabbitmq_num_queuesは指定しないでください (デフォルトは 1) 。特定のテーブル用に宣言されたすべてのキューから消費を再開するには、同じ設定、つまりrabbitmq_queue_base、rabbitmq_num_consumers、rabbitmq_num_queuesを指定してください。デフォルトでは、キュー名はテーブルごとに一意になります。 - キューは durable で自動削除されないため、それらを再利用するためです。 (削除する場合は RabbitMQ CLI ツールのいずれかを使用できます。)
rabbitmq_num_consumers および/または rabbitmq_num_queues 設定を rabbitmq_exchange_type とともに指定する場合は、次の条件があります。
rabbitmq-consistent-hash-exchangeプラグインを有効にする必要があります。- 公開されるメッセージの
message_idプロパティを指定する必要があります (各メッセージ/バッチごとに一意) 。
messageID と republished フラグ (複数回公開された場合は true) が追加されます。これらにはメッセージヘッダー経由でアクセスできます。
INSERT と materialized view に同じテーブルを使用しないでください。
例:
仮想カラム
_exchange_name- RabbitMQ exchange名。データ型:String。_channel_id- メッセージを受信した コンシューマー が宣言された ChannelID。データ型:String。_delivery_tag- 受信したメッセージの DeliveryTag。チャネルごとのスコープです。データ型:UInt64。_redelivered- メッセージのredeliveredフラグ。データ型:UInt8。_message_id- 受信したメッセージの messageID。メッセージの公開時に設定されていた場合は空ではありません。データ型:String。_timestamp- 受信したメッセージの timestamp。メッセージの公開時に設定されていた場合は空ではありません。データ型:UInt64。
rabbitmq_handle_error_mode='stream' の場合に追加される仮想カラム:
_raw_message- 正常にパースできなかった生メッセージ。データ型:Nullable(String)。_error- パース失敗時に発生した例外メッセージ。データ型:Nullable(String)。
_raw_message と _error の仮想カラムに値が設定されるのは、パース中に例外が発生した場合のみです。メッセージが正常にパースされた場合、これらは常に NULL です。
注意事項
DEFAULT、MATERIALIZED、ALIAS など) をテーブル定義で指定していても、これらは無視されます。代わりに、各カラムにはその型に応じたデフォルト値が設定されます。
データフォーマットのサポート
- 行ベースのフォーマットでは、1 つの RabbitMQ メッセージに含める行数は、
rabbitmq_max_rows_per_messageの設定で制御できます。 - ブロックベースのフォーマットでは、ブロックをさらに小さなパーツに分割することはできませんが、1 つのブロックに含める行数は、一般設定 max_block_size で制御できます。