ClickHouse は高速化を前提に設計されています。利用可能なすべての CPU コアを使い、データを処理レーンに分散し、ハードウェア性能を限界近くまで引き出しながら、クエリを高い並列性で実行します。
このガイドでは、ClickHouse におけるクエリ並列化の仕組みと、大規模なワークロードでパフォーマンスを向上させるために、それをどのように調整・監視できるかを順を追って説明します。
主要な概念を説明するため、uk_price_paid_simple データセットに対する集計クエリを使用します。
ステップごとに見る: ClickHouse が集計クエリをどのように並列化するか
ClickHouse ① は、テーブルの主キーに対するフィルタを含む集計クエリを実行する際、② 処理が必要なグラニュールと安全にスキップできるグラニュールを特定するために、③ プライマリインデックスをメモリに読み込みます。
選択したデータはその後、動的にn 本の並列処理レーンに分散され、各レーンがデータをブロック単位でストリーミング処理して、最終結果へとまとめます。
n 本の並列処理レーンの数は、max_threads 設定で制御されます。デフォルトでは、server 上で ClickHouse が使用できる単一 CPU のコア数 (スレッド数) に一致します。上の例では、4 コアを想定しています。
8 コアのマシンでは、より多くのレーンが並列にデータを処理できるため、クエリ処理のスループットはおおよそ 2 倍になります (ただし、それに応じてメモリ使用量も増加します) 。
CPU 使用率を最大化し、クエリ全体の実行時間を短縮するには、レーンへの効率的な分散が重要です。
テーブルデータが複数のサーバーに分片として分散されている場合、各サーバーはそれぞれの分片を並列に処理します。各サーバー内では、前述のとおり、ローカルデータが並列処理レーンを使って処理されます。
最初にクエリを受信したサーバーは、各分片からのすべての部分結果を集約し、結合して最終的な全体結果を生成します。
クエリ負荷を分片間に分散することで、特に高スループット環境では、並列性を水平方向に拡張できます。
ClickHouse Cloud では分片の代わりに並列レプリカを使用しますClickHouse Cloud では、同様の並列性は並列レプリカによって実現されます。これは、shared-nothing クラスターにおける分片と同様に機能します。各 ClickHouse Cloud レプリカ (ステートレスなコンピュートノード) はデータの一部を並列に処理し、独立した分片と同じように最終結果に寄与します。
クエリが利用可能な CPU リソースを十分に使い切れているかを確認し、使い切れていない場合にその原因を診断するには、これらのツールを使用します。
ここでは 59 個の CPU コアを備えたテストサーバーで実行しており、ClickHouse のクエリ並列性を十分に引き出せるようにしています。
この例のクエリがどのように実行されるかを確認するため、集計クエリの実行中にトレースレベルのログエントリをすべて返すよう ClickHouseサーバーに指示できます。今回のデモでは、クエリのフィルタ条件を削除しました。これを残すと処理されるグラニュールは 3 つだけになり、ClickHouse が数本を超える並列処理レーンを活用するにはデータ量が不十分だからです。
SELECT
max(price)
FROM
uk.uk_price_paid_simple
SETTINGS send_logs_level='trace';
① <Debug> ...: 3609 marks to read from 3 ranges
② <Trace> ...: Spreading mark ranges among streams
② <Debug> ...: Reading approx. 29564928 rows with 59 streams
次のことがわかります。
- ① ClickHouse は、3 つのデータ範囲にまたがって 3,609 個のグラニュール (トレースログでは marks として示されます) を読み取る必要があります。
- ② CPU コアが 59 個あるため、この処理は 59 本の並列処理ストリーム (各レーンに 1 本ずつ) に分散されます。
あるいは、EXPLAIN 句を使って、集計クエリの 物理オペレータープラン (「クエリパイプライン」とも呼ばれます) を確認することもできます。
EXPLAIN PIPELINE
SELECT
max(price)
FROM
uk.uk_price_paid_simple;
┌─explain───────────────────────────────────────────────────────────────────────────┐
1. │ (Expression) │
2. │ ExpressionTransform × 59 │
3. │ (Aggregating) │
4. │ Resize 59 → 59 │
5. │ AggregatingTransform × 59 │
6. │ StrictResize 59 → 59 │
7. │ (Expression) │
8. │ ExpressionTransform × 59 │
9. │ (ReadFromMergeTree) │
10. │ MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 59 0 → 1 │
└───────────────────────────────────────────────────────────────────────────────────┘
注: 上のオペレータープランは下から上に向かって読んでください。各行は物理実行プランの 1 つのステージを表しており、下部のストレージからのデータ読み取りで始まり、上部の最終処理ステップで終わります。× 59 と付いた operator は、互いに重ならないデータ領域に対して 59 本の並列処理レーンで同時実行されます。これは max_threads の値を反映しており、クエリの各ステージが CPU コア全体にわたってどのように並列化されるかを示しています。
ClickHouse’s 組み込み web UI (/play エンドポイントで利用可能) では、上記の物理プランをグラフィカルに可視化できます。この例では、可視化をコンパクトに保つために max_threads を 4 に設定し、4 本の並列処理レーンだけを表示しています。
注: 可視化は左から右に向かって読んでください。各行は 1 本の並列処理レーンを表しており、データブロックをブロック単位でストリーミングしながら、フィルタリング、集約、最終処理ステージなどの変換を適用します。この例では、max_threads = 4 の設定に対応する 4 本の並列レーンを確認できます。
上記の物理プランにある Resize 演算子は、各処理レーンを均等に活用できるよう、データブロックのストリームを再パーティション化し、再分配します。こうした負荷の再調整は、データ範囲によってクエリの述語条件に一致する行数が異なる場合に、特に重要です。これを行わないと、一部のレーンに負荷が集中する一方で、他のレーンはアイドル状態になることがあります。作業を再分配することで、高速なレーンが実質的に低速なレーンを補い、クエリ全体の実行時間を最適化できます。
max_threads が常に適用されるとは限らない理由
前述のとおり、n 個の並列処理レーンの数は max_threads 設定で制御され、既定ではサーバー上で ClickHouse が利用できる CPU コア数と一致します。
SELECT getSetting('max_threads');
┌─getSetting('max_threads')─┐
1. │ 59 │
└───────────────────────────┘
ただし、処理対象として選択されたデータ量によっては、max_threads の値が反映されない場合があります。
EXPLAIN PIPELINE
SELECT
max(price)
FROM
uk.uk_price_paid_simple
WHERE town = 'LONDON';
...
(ReadFromMergeTree)
MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 30
上記のオペレータープランの抜粋が示すとおり、max_threads は 59 に設定されていても、ClickHouse がデータのスキャンに使用する同時実行ストリームは 30 のみです。
では、クエリを実行してみましょう。
SELECT
max(price)
FROM
uk.uk_price_paid_simple
WHERE town = 'LONDON';
┌─max(price)─┐
1. │ 594300000 │ -- 594.30 million
└────────────┘
1 row in set. Elapsed: 0.013 sec. Processed 2.31 million rows, 13.66 MB (173.12 million rows/s., 1.02 GB/s.)
Peak memory usage: 27.24 MiB.
上の出力が示すように、このクエリは 231 万行を処理し、13.66MB のデータを読み取りました。これは、索引解析フェーズで ClickHouse が処理対象として 282 個のグラニュール を選択し、それぞれに 8,192 行が含まれていたためで、合計すると約 231 万行になるからです:
EXPLAIN indexes = 1
SELECT
max(price)
FROM
uk.uk_price_paid_simple
WHERE town = 'LONDON';
┌─explain───────────────────────────────────────────────┐
1. │ Expression ((Project names + Projection)) │
2. │ Aggregating │
3. │ Expression (Before GROUP BY) │
4. │ Expression │
5. │ ReadFromMergeTree (uk.uk_price_paid_simple) │
6. │ Indexes: │
7. │ PrimaryKey │
8. │ Keys: │
9. │ town │
10. │ Condition: (town in ['LONDON', 'LONDON']) │
11. │ Parts: 3/3 │
12. │ Granules: 282/3609 │
└───────────────────────────────────────────────────────┘
設定された max_threads の値にかかわらず、ClickHouse は、それを割り当てるだけの十分なデータがある場合にのみ、追加の並列処理レーンを確保します。max_threads の「max」はあくまで上限を意味し、実際に使用されるスレッド数を保証するものではありません。
ここでいう「十分なデータ」は、主に 2 つの設定によって決まります。これらの設定は、各処理レーンが処理すべき最小の行数 (デフォルトでは 163,840) と最小のバイト数 (デフォルトでは 2,097,152) を定義します。
shared-nothing クラスターの場合:
共有ストレージを使用するクラスター (例: ClickHouse Cloud) の場合:
さらに、読み取り task size には厳格な下限があり、次の設定で制御されます:
これらの設定は変更しないでください本番環境でこれらの設定を変更することは推奨されません。ここでは、max_threads が実際の並列度を常に決めるわけではない理由を示すためにのみ掲載しています。
説明のため、これらの設定を上書きして最大の同時実行を強制した状態で、物理プランを確認してみましょう:
EXPLAIN PIPELINE
SELECT
max(price)
FROM
uk.uk_price_paid_simple
WHERE town = 'LONDON'
SETTINGS
max_threads = 59,
merge_tree_min_read_task_size = 0,
merge_tree_min_rows_for_concurrent_read_for_remote_filesystem = 0,
merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem = 0;
...
(ReadFromMergeTree)
MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 59
現在、ClickHouse は設定された max_threads を完全に遵守しつつ、59 本のストリームを同時に使用してデータをスキャンしています。
これは、小規模なデータに対するクエリでは、ClickHouse が意図的に同時実行数を抑えることを示しています。設定のオーバーライドはテスト時にのみ使用し、本番環境では使用しないでください。実行効率の低下やリソース競合を招く可能性があるためです。
- ClickHouse は、
max_threads に紐づく処理レーンを使ってクエリを並列実行します。
- 実際のレーン数は、処理対象として選択されたデータ量によって決まります。
- レーンの使用状況を分析するには、
EXPLAIN PIPELINE とトレースログを使用します。
ClickHouse がクエリを並列実行する仕組みや、大規模環境で高いパフォーマンスを実現する方法をさらに詳しく知りたい場合は、以下のリソースを参照してください。