メインコンテンツへスキップ

はじめに

ClickHouse はクエリを非常に高速に処理しますが、これらのクエリは 複数のサーバー間でどのように分散され、並列化されているのでしょうか。
このガイドでは、まず ClickHouse が分散テーブルを介してクエリを 複数の分片にどのように分散するのかを説明し、その後、クエリの実行で 複数のレプリカをどのように活用できるのかを説明します。

分片アーキテクチャ

shared-nothing アーキテクチャでは、クラスターは通常、複数の分片に 分割され、各分片には全体データの一部が格納されます。これらの分片の 上位には分散テーブルがあり、データ全体を単一のビューとして提供します。 読み取りはローカルテーブルに送ることができます。その場合、クエリの実行は 指定した分片でのみ行われます。あるいは分散テーブルに送ることもでき、その 場合は各分片で指定されたクエリが実行されます。分散テーブルに対するクエリを 受けたサーバーがデータを集約し、クライアントに応答を返します。 上の図は、クライアントが分散テーブルにクエリを実行した際に何が起こるかを示しています。
  1. SELECT クエリは、任意のノード上の分散テーブルに送信されます (ラウンドロビン戦略を介するか、ロードバランサーによって特定のサーバーに ルーティングされた後) 。このノードがコーディネーターとして動作します。
  2. このノードは、分散テーブルで指定された情報に基づいて、 クエリを実行する必要がある各分片を特定し、クエリを 各分片に送信します。
  3. 各分片はローカルでデータを読み取り、フィルタリングと集約を行った後、 マージ可能な状態をコーディネーターに返します。
  4. コーディネーターとなるノードがデータをマージし、その後 応答をクライアントに返します。
ここにレプリカが加わっても、処理の流れはほぼ同じで、違いは 各分片から 1 つのレプリカだけがクエリを実行することです。 つまり、より多くのクエリを並列に処理できるようになります。

非分片アーキテクチャ

ClickHouse Cloud のアーキテクチャは、上で示したものとは大きく異なります。 (詳細は “ClickHouse Cloud Architecture” を参照してください) 。コンピュートとストレージが分離され、さらに事実上 無制限のストレージを利用できるため、分片の必要性は以前ほど高くありません。 以下の図は ClickHouse Cloud のアーキテクチャを示しています。 このアーキテクチャでは、レプリカをほぼ瞬時に追加・削除できるため、 クラスターの高い拡張性を実現できます。ClickHouse Keeper クラスター (右側に表示) は、メタデータの単一の信頼できる情報源を 担います。レプリカは ClickHouse Keeper クラスターからメタデータを取得し、すべて同じデータを維持できます。データ自体は オブジェクトストレージに保存され、SSD cache によってクエリを高速化できます。 では、クエリ実行を複数のサーバーにまたがってどのように分散するのでしょうか。分片 アーキテクチャでは、各分片がデータの一部に対して実際にクエリを実行できるため、 比較的わかりやすいものでした。では、sharding がない場合はどのように機能するのでしょうか。

並列レプリカの紹介

複数のサーバーにまたがってクエリ実行を並列化するには、まず サーバーのうち 1 台をコーディネーターとして割り当てられるようにする 必要があります。コーディネーターは、実行すべき タスクの一覧を作成し、それらがすべて実行・ 集計され、結果がクライアントに返されることを保証する役割を担います。多くの 分散システムと同様に、通常これは最初の クエリを受け取ったノードが担います。また、作業単位を定義する必要もあります。分片化アーキテクチャでは、 作業単位は分片、つまりデータの一部分です。並列レプリカでは、 作業単位として グラニュール と呼ばれるテーブルの小さな部分を 使用します。 では、以下の図を使って、実際にどのように動作するのか見てみましょう。 並列レプリカでは:
  1. クライアントからのクエリは、ロード バランサーを経由して 1 つのノードに送られます。このノードがこのクエリのコーディネーターになります。
  2. そのノードは各パーツの索引を解析し、処理対象のパーツと グラニュールを選択します。
  3. コーディネーターは、ワークロードを 異なるレプリカに割り当て可能な一連のグラニュールに分割します。
  4. 各グラニュールの集合は対応するレプリカで処理され、完了すると マージ可能な状態がコーディネーターに送られます。
  5. 最後に、コーディネーターがレプリカからのすべての結果をマージし、 クライアントにレスポンスを返します。
上記の手順は、並列レプリカが理論上どのように動作するかを示しています。 しかし実際には、このような仕組みが 完全には機能しない要因が数多くあります。
  1. 利用できないレプリカがある場合があります。
  2. ClickHouse のレプリケーションは非同期であるため、ある時点では一部のレプリカが 同じパーツを持っていない可能性があります。
  3. レプリカ間のテールレイテンシを何らかの方法で扱う必要があります。
  4. ファイルシステムキャッシュは各レプリカ上の アクティビティに応じて異なるため、タスクをランダムに割り当てると、 キャッシュ局所性の観点では最適でないパフォーマンスにつながる可能性があります。
これらの要因をどのように克服しているかは、以降のセクションで説明します。

アナウンス

上のリストの (1) と (2) に対処するため、アナウンス という概念を導入しました。以下の図でその仕組みを見てみましょう。
  1. クライアントからのクエリは、ロード バランサーを経由して 1 つのノードに送られます。このノードが、このクエリのコーディネーターになります。
  2. コーディネーターとなるノードは、 クラスター内のすべてのレプリカからアナウンスを取得するためのリクエストを送信します。レプリカごとに、テーブルの現在のパーツ集合の見え方が わずかに異なる場合があります。そのため、 不正確なスケジューリング判断を避けるには、この情報を収集する必要があります。
  3. その後、コーディネーターとなるノードはアナウンスを使って、 各レプリカに割り当て可能な グラニュール の集合を定義します。たとえばここでは、 パーツ 3 の グラニュール はレプリカ 2 には 1 つも割り当てられていないことがわかります。 これは、このレプリカがアナウンスでこのパーツを通知しなかったためです。 また、 レプリカ 3 はアナウンスを返さなかったため、タスクが 1 つも割り当てられていない点にも注意してください。
  4. 各レプリカが自分に割り当てられた グラニュール のサブセットに対してクエリを処理し、 マージ可能な状態がコーディネーターに送り返されると、 コーディネーターが結果をマージし、そのレスポンスがクライアントに送信されます。

動的協調

テールレイテンシの問題に対処するため、動的協調を追加しました。これは、 すべてのグラニュールを1回のリクエストでレプリカに送るのではなく、各レプリカが コーディネーターに新しいタスク (処理するグラニュールのセット) を要求 できることを意味します。コーディネーターは、受け取ったアナウンスに基づいて グラニュールのセットをレプリカに割り当てます。 ここでは、すべてのレプリカがすべてのパーツを含む アナウンスを送信し終えた段階にあると仮定します。 以下の図は、動的協調がどのように機能するかを示しています。
  1. レプリカは、タスクを処理できることをコーディネーターノードに知らせます。 また、どれだけの作業を処理できるかを指定することもできます。
  2. コーディネーターはレプリカにタスクを割り当てます。
  1. レプリカ1と2は、自分たちのタスクを非常にすばやく完了します。 そのため、コーディネーターノードに別のタスクを要求します。
  2. コーディネーターはレプリカ1と2に新しいタスクを割り当てます。
  1. すべてのレプリカが、それぞれのタスクの処理を完了しました。 さらにタスクを要求します。
  2. コーディネーターは、アナウンスを使って未処理のタスクが残っているかどうかを 確認しますが、残っているタスクはありません。
  3. コーディネーターは、すべての処理が完了したことをレプリカに伝えます。 その後、マージ可能なすべての状態をマージして、クエリに応答します。

cache の局所性の管理

最後に残っている潜在的な問題は、cache の局所性をどう扱うかです。クエリが複数回実行される場合、同じタスクが同じレプリカにルーティングされることを、どのように保証すればよいのでしょうか。前の例では、次のタスクが割り当てられていました。
レプリカ 1レプリカ 2レプリカ 3
パート 1g1, g6, g7g2, g4, g5g3
パート 2g1g2, g4, g5g3
パート 3g1, g6g2, g4, g5g3
同じタスクが同じレプリカに割り当てられ、cache の恩恵を受けられるようにするため、2 つの処理が行われます。まず、パーツ + グラニュール の集合 (タスク) のハッシュを計算します。次に、タスクの割り当てに対して、レプリカ数を法とする剰余演算を適用します。 理屈の上ではよさそうに見えますが、実際には、あるレプリカに急に負荷がかかったり、ネットワークが劣化したりすると、特定のタスクの実行に同じレプリカが継続的に使われることで、テールレイテンシが発生する可能性があります。max_parallel_replicas がレプリカ数より少ない場合は、クエリ実行のためにランダムにレプリカが選択されます。

タスクのスティーリング

あるレプリカでのタスク処理が他のレプリカより遅い場合、他のレプリカは、ハッシュに基づいて本来そのレプリカに割り当てられているタスクを「奪う」ことで、テールレイテンシの低減を図ります。

制限事項

この機能には既知の制限があり、主なものをこのセクションで説明します。
以下に挙げた制限事項以外の問題が見つかり、その原因が並列レプリカにあると思われる場合は、 GitHub でラベル comp-parallel-replicas を付けて issue を作成してください。
LimitationDescription
複雑なクエリ現時点では、並列レプリカはシンプルなクエリでは比較的うまく機能します。CTE、サブクエリ、JOIN、ネストしたクエリなど、クエリの複雑さが増すと、クエリパフォーマンスに悪影響を及ぼす可能性があります。
小規模なクエリ処理する行数がそれほど多くないクエリでは、複数のレプリカで実行しても性能が向上しないことがあります。これは、レプリカ間の協調に伴うネットワーク時間によって、クエリ実行に余分なサイクルが発生するためです。この問題は、設定 parallel_replicas_min_number_of_rows_per_replica を使用することで抑えられます。
FINAL では並列レプリカは無効
プロジェクションは並列レプリカと併用されません
高カーディナリティのデータと複雑な集約大量のデータ送信が必要な高カーディナリティの集約では、クエリが大幅に遅くなる可能性があります。
新しいアナライザとの互換性新しいアナライザにより、特定のシナリオではクエリ実行が大幅に遅くなったり、逆に速くなったりする可能性があります。
SettingDescription
enable_parallel_replicas0: 無効
1: 有効
2: 並列レプリカの使用を強制します。使用されない場合は例外をスローします。
cluster_for_parallel_replicas並列レプリカに使用するクラスター名です。ClickHouse Cloud を使用している場合は default を使用してください。
max_parallel_replicas複数のレプリカでクエリを実行する際に使用するレプリカの最大数です。クラスター内のレプリカ数より少ない値を指定すると、ノードはランダムに選択されます。この値は、水平スケーリングを考慮してオーバーコミットすることもできます。
parallel_replicas_min_number_of_rows_per_replica処理対象の行数に基づいて、使用するレプリカ数を制限するのに役立ちます。使用されるレプリカ数は次の式で決まります。
estimated rows to read / min_number_of_rows_per_replica
enable_analyzer並列レプリカによるクエリ実行は、アナライザが有効な場合にのみサポートされます

並列レプリカの問題の調査

各クエリでどの設定が使われているかは、 system.query_log テーブルで確認できます。また、 system.events テーブルを見ると、サーバー上で発生したすべてのイベントを確認できます。さらに、 clusterAllReplicas テーブル関数を使うと、すべてのレプリカ上のテーブルを確認できます (Cloud ユーザーの場合は default を使用してください) 。
Query
SELECT
   hostname(),
   *
FROM clusterAllReplicas('default', system.events)
WHERE event ILIKE '%ParallelReplicas%'
Response
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleRequestMicroseconds      │   438 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   558 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadUnassignedMarks            │   240 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadAssignedForStealingMarks   │     4 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingByHashMicroseconds     │     5 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasProcessingPartsMicroseconds    │     5 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     3 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleRequestMicroseconds      │   698 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   644 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadUnassignedMarks            │   190 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadAssignedForStealingMarks   │    54 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingByHashMicroseconds     │     8 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasProcessingPartsMicroseconds    │     4 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleRequestMicroseconds      │   620 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   656 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadUnassignedMarks            │     1 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadAssignedForStealingMarks   │     1 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingByHashMicroseconds     │     4 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasProcessingPartsMicroseconds    │     3 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     1 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleRequestMicroseconds      │   696 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   717 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadUnassignedMarks            │     2 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadAssignedForStealingMarks   │     2 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingByHashMicroseconds     │    10 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasProcessingPartsMicroseconds    │     6 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
system.text_log テーブルには、並列レプリカを使用したクエリの実行に関する情報も 含まれています。
Query
SELECT message
FROM clusterAllReplicas('default', system.text_log)
WHERE query_id = 'ad40c712-d25d-45c4-b1a1-a28ba8d4019c'
ORDER BY event_time_microseconds ASC
Response
┌─message────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ (from 54.218.178.249:59198) SELECT * FROM session_events WHERE type='type2' LIMIT 10 SETTINGS allow_experimental_parallel_reading_from_replicas=2; (stage: Complete)                                                                                       │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage Complete │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage WithMergeableState to stage Complete │
│ The number of replicas requested (100) is bigger than the real number available in the cluster (6). Will use the latter number to execute the query.                                                                                                       │
│ Initial request from replica 4: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 4 replica

│ Reading state is fully initialized: part all_0_2_1 with ranges [(0, 182)] in replicas [4]; part all_3_3_0 with ranges [(0, 62)] in replicas [4]                                                                                                            │
│ Sent initial requests: 1 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 2: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 2 replica

│ Sent initial requests: 2 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 1 parts: [part all_0_2_1 with ranges [(128, 182)]]. Finish: false; mine_marks=0, stolen_by_hash=54, stolen_rest=0                                                                                                       │
│ Initial request from replica 1: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 1 replica

│ Sent initial requests: 3 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 2 parts: [part all_0_2_1 with ranges [(0, 128)], part all_3_3_0 with ranges [(0, 62)]]. Finish: false; mine_marks=0, stolen_by_hash=0, stolen_rest=190                                                                  │
│ Initial request from replica 0: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 0 replica

│ Sent initial requests: 4 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 5: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 5 replica

│ Sent initial requests: 5 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 2, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 2 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Initial request from replica 3: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 3 replica

│ Sent initial requests: 6 Replicas count: 6                                                                                                                                                                                                                 │
│ Total rows to read: 2000000                                                                                                                                                                                                                                │
│ Handling request from replica 5, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 5 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 0, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 0 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 1, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 1 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 3, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 3 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ (c-crimson-vd-86-server-rdhnsx3-0.c-crimson-vd-86-server-headless.ns-crimson-vd-86.svc.cluster.local:9000) Cancelling query because enough data has been read                                                                                              │
│ Read 81920 rows, 5.16 MiB in 0.013166 sec., 6222087.194288318 rows/sec., 391.63 MiB/sec.                                                                                                                                                                   │
│ Coordination done: Statistics: replica 0 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 1 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 2 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 3 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 4 - {requests: 3 marks: 244 assigned_to_me: 0 stolen_by_hash: 54 stolen_unassigned: 190}; replica 5 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0} │
│ Peak memory usage (for query): 1.81 MiB.                                                                                                                                                                                                                   │
│ Processed in 0.024095586 sec.                                                                                                                                                                                                                              │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
最後に、EXPLAIN PIPELINE を使用することもできます。これにより、ClickHouse がクエリをどのように実行し、クエリの実行に どのようなリソースが使われるかを確認できます。例として、次のクエリを見てみましょう。
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) LIMIT 10
並列レプリカを使わない場合のクエリパイプラインを見てみましょう。
EXPLAIN PIPELINE (without parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=0 
FORMAT TSV;
続いて、並列レプリカ を有効にした場合:
EXPLAIN PIPELINE (with parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=2 
FORMAT TSV;
最終更新日 2026年6月10日