메인 콘텐츠로 건너뛰기

소개

ClickHouse는 쿼리를 매우 빠르게 처리합니다. 그렇다면 이러한 쿼리는 여러 서버에 걸쳐 어떻게 분산되고 병렬로 처리될까요?
이 가이드에서는 먼저 ClickHouse가 분산 테이블을 통해 여러 세그먼트에 쿼리를 어떻게 분산하는지 설명하고, 이어서 쿼리 실행에 여러 레플리카를 어떻게 활용할 수 있는지 설명합니다.

세그먼트 아키텍처

shared-nothing 아키텍처에서는 일반적으로 클러스터를 여러 세그먼트로 나누며, 각 세그먼트는 전체 데이터의 부분 집합을 포함합니다. 이 세그먼트들 위에는 분산 테이블이 있어 전체 데이터에 대한 단일한 뷰를 제공합니다. 읽기 요청은 로컬 테이블로 보낼 수 있습니다. 이 경우 쿼리 실행은 지정된 세그먼트에서만 발생합니다. 또는 분산 테이블로 보낼 수도 있으며, 이 경우 각 세그먼트가 해당 쿼리를 실행합니다. 분산 테이블에 쿼리가 수행된 서버는 데이터를 집계한 뒤 클라이언트에 응답합니다: 위 그림은 클라이언트가 분산 테이블에 쿼리를 실행할 때 어떤 일이 일어나는지 보여줍니다:
  1. SELECT 쿼리는 임의의 노드에 있는 분산 테이블로 전송됩니다 (라운드 로빈 전략을 사용하거나, 로드 밸런서가 특정 서버로 라우팅한 경우). 이 노드는 이제 코디네이터 역할을 합니다.
  2. 이 노드는 분산 테이블에 지정된 정보를 바탕으로 쿼리를 실행해야 하는 각 세그먼트를 찾고, 쿼리를 각 세그먼트로 전송합니다.
  3. 각 세그먼트는 데이터를 로컬에서 읽고, 필터링하고, 집계한 다음 머지 가능한 상태를 코디네이터에 다시 보냅니다.
  4. 코디네이터 역할을 하는 노드는 데이터를 머지한 다음 응답을 클라이언트에 다시 보냅니다.
여기에 레플리카를 추가해도 프로세스는 거의 동일하며, 차이점은 각 세그먼트에서 단 하나의 레플리카만 쿼리를 실행한다는 점뿐입니다. 즉, 더 많은 쿼리를 병렬로 처리할 수 있습니다.

세그먼트 분할이 없는 아키텍처

ClickHouse Cloud는 위에서 설명한 아키텍처와는 매우 다른 구조를 사용합니다. (자세한 내용은 “ClickHouse Cloud 아키텍처”를 참조하십시오). 컴퓨트와 스토리지가 분리되어 있고, 사실상 무한한 스토리지를 사용할 수 있으므로 세그먼트의 필요성은 훨씬 줄어듭니다. 아래 그림은 ClickHouse Cloud 아키텍처를 보여줍니다. 이 아키텍처에서는 레플리카를 거의 즉시 추가하거나 제거할 수 있으므로 클러스터 확장성이 매우 뛰어납니다. 오른쪽에 표시된 ClickHouse Keeper 클러스터는 메타데이터에 대한 단일 기준점을 제공합니다. 레플리카는 ClickHouse Keeper 클러스터에서 메타데이터를 가져와 모두 동일한 데이터를 유지할 수 있습니다. 데이터 자체는 객체 스토리지에 저장되며, SSD 캐시는 쿼리 속도를 높이는 데 도움이 됩니다. 그렇다면 이제 여러 서버에 걸쳐 쿼리 실행을 어떻게 분산할 수 있을까요? 세그먼트로 분할된 아키텍처에서는 각 세그먼트가 실제로 데이터의 부분 집합에 대해 쿼리를 실행할 수 있으므로 그 방식이 비교적 분명했습니다. 세그먼트 분할이 없을 때는 어떻게 동작할까요?

병렬 레플리카 소개

여러 서버를 통해 쿼리 실행을 병렬화하려면 먼저 서버 중 하나를 코디네이터로 지정할 수 있어야 합니다. 코디네이터는 실행해야 할 작업 목록을 만들고, 모든 작업이 실행 및 집계된 뒤 결과가 클라이언트에 반환되도록 보장합니다. 대부분의 분산 시스템과 마찬가지로 이 역할은 최초 쿼리를 수신한 노드가 맡습니다. 또한 작업 단위를 정의해야 합니다. 세그먼트 기반 아키텍처에서는 작업 단위가 데이터의 부분 집합인 세그먼트입니다. 병렬 레플리카에서는 그래뉼이라고 하는 테이블의 작은 부분을 작업 단위로 사용합니다. 이제 아래 그림을 통해 실제로 어떻게 동작하는지 살펴보겠습니다. 병렬 레플리카는 다음과 같이 동작합니다.
  1. 클라이언트의 쿼리는 로드 밸런서를 거쳐 하나의 노드로 전송됩니다. 이 노드는 해당 쿼리의 코디네이터가 됩니다.
  2. 이 노드는 각 파트의 인덱스를 분석하고 처리할 적절한 파트와 그래뉼을 선택합니다.
  3. 코디네이터는 워크로드를 여러 레플리카에 할당할 수 있는 그래뉼 집합으로 분할합니다.
  4. 각 그래뉼 집합은 해당 레플리카에서 처리되며, 처리가 끝나면 머지 가능한 상태가 코디네이터로 전송됩니다.
  5. 마지막으로 코디네이터가 레플리카의 모든 결과를 머지한 뒤 클라이언트에 응답을 반환합니다.
위 단계는 병렬 레플리카가 이론적으로 어떻게 동작하는지를 보여줍니다. 하지만 실제 환경에서는 이러한 로직이 완벽하게 동작하지 못하게 하는 요인이 많습니다.
  1. 일부 레플리카는 사용 불가능할 수 있습니다.
  2. ClickHouse의 복제는 비동기식이므로 특정 시점에는 일부 레플리카가 동일한 파트를 가지고 있지 않을 수 있습니다.
  3. 레플리카 간 테일 지연 시간을 어떻게든 처리해야 합니다.
  4. 파일 시스템 캐시는 각 레플리카의 활동에 따라 달라지므로, 작업을 무작위로 할당하면 캐시 지역성 측면에서 성능이 최적이 아닐 수 있습니다.
다음 섹션에서는 이러한 요인을 어떻게 극복하는지 살펴보겠습니다.

Announcements

위 목록의 (1)과 (2)를 해결하기 위해 announcement라는 개념을 도입했습니다. 아래 그림을 통해 이것이 어떻게 동작하는지 살펴보겠습니다:
  1. 클라이언트의 쿼리는 로드 밸런서를 거쳐 하나의 노드로 전달됩니다. 이 노드는 해당 쿼리의 코디네이터가 됩니다.
  2. coordinating node는 클러스터의 모든 레플리카로부터 announcement를 받기 위한 요청을 보냅니다. 레플리카는 테이블의 현재 파트 집합을 서로 약간 다르게 보고 있을 수 있습니다. 따라서 잘못된 스케줄링 결정을 피하려면 이 정보를 수집해야 합니다.
  3. 그런 다음 coordinating node는 announcement를 바탕으로 각 레플리카에 할당할 수 있는 그래뉼 집합을 정의합니다. 예를 들어 여기서는 레플리카 2가 자신의 announcement에 파트 3을 포함하지 않았기 때문에, 파트 3의 그래뉼은 레플리카 2에 할당되지 않은 것을 볼 수 있습니다. 또한 레플리카 3은 announcement를 제공하지 않았기 때문에 어떤 작업도 할당되지 않았다는 점에 유의하십시오.
  4. 각 레플리카가 자신에게 할당된 그래뉼 부분 집합에 대해 쿼리를 처리하고, 머지 가능한 상태를 코디네이터로 다시 보내면 코디네이터가 결과를 머지한 후 응답이 클라이언트로 전송됩니다.

동적 조정

꼬리 지연 시간 문제를 해결하기 위해 동적 조정을 추가했습니다. 즉, 모든 그래뉼을 한 번의 요청으로 레플리카에 보내는 대신, 각 레플리카가 코디네이터에 새로운 작업(처리할 그래뉼 집합)을 요청할 수 있습니다. 코디네이터는 수신한 announcement를 바탕으로 레플리카에 그래뉼 집합을 할당합니다. 이제 모든 레플리카가 모든 파트에 대한 announcement를 전송한 단계라고 가정하겠습니다. 아래 그림은 동적 조정이 작동하는 방식을 보여줍니다:
  1. 레플리카는 작업을 처리할 수 있음을 코디네이터 노드에 알리며, 처리 가능한 작업량도 지정할 수 있습니다.
  2. 코디네이터가 레플리카에 작업을 할당합니다.
  1. 레플리카 1과 2는 작업을 매우 빠르게 끝낼 수 있습니다. 이들은 코디네이터 노드에 다른 작업을 요청합니다.
  2. 코디네이터가 레플리카 1과 2에 새로운 작업을 할당합니다.
  1. 이제 모든 레플리카가 작업 처리를 마쳤습니다. 이들은 더 많은 작업을 요청합니다.
  2. 코디네이터는 announcement를 사용해 남아 있는 작업이 무엇인지 확인하지만, 남은 작업은 없습니다.
  3. 코디네이터는 모든 작업이 처리되었음을 레플리카에 알립니다. 이제 머지 가능한 모든 상태를 머지한 뒤 쿼리에 응답합니다.

캐시 지역성 관리

마지막으로 남아 있는 잠재적인 문제는 캐시 지역성을 어떻게 처리할지입니다. 쿼리가 여러 번 실행될 때, 동일한 작업이 동일한 레플리카로 라우팅되도록 어떻게 보장할 수 있을까요? 앞선 예시에서는 작업이 다음과 같이 할당되었습니다:
레플리카 1레플리카 2레플리카 3
파트 1g1, g6, g7g2, g4, g5g3
파트 2g1g2, g4, g5g3
파트 3g1, g6g2, g4, g5g3
동일한 작업이 동일한 레플리카에 할당되어 캐시의 이점을 활용할 수 있도록 두 가지 처리가 수행됩니다. 먼저, part + 그래뉼 집합(작업)의 해시가 계산됩니다. 그리고 작업 할당을 위해 레플리카 수를 기준으로 나머지 연산이 적용됩니다. 이론상으로는 좋아 보이지만, 실제로는 특정 레플리카에 갑작스러운 부하가 발생하거나 네트워크 성능이 저하되면, 특정 작업 실행에 동일한 레플리카가 계속 사용될 경우 테일 지연 시간이 발생할 수 있습니다. max_parallel_replicas가 레플리카 수보다 적으면, 쿼리 실행을 위해 무작위 레플리카가 선택됩니다.

작업 스틸링

일부 레플리카가 다른 레플리카보다 작업을 더 느리게 처리하면, 테일 지연 시간을 줄이기 위해 다른 레플리카가 원칙적으로는 해시에 따라 해당 레플리카에 할당된 작업을 ‘스틸링’하려고 시도합니다.

제한 사항

이 기능에는 알려진 제한 사항이 있으며, 주요 제한 사항은 이 섹션에 설명되어 있습니다.
아래에 나열된 제한 사항에 포함되지 않는 문제를 발견했고, 원인이 병렬 레플리카에 있다고 의심되면 GitHub에서 레이블 comp-parallel-replicas를 사용해 이슈를 등록하십시오.
LimitationDescription
복잡한 쿼리현재 병렬 레플리카는 단순한 쿼리에서는 비교적 잘 작동합니다. CTE, 서브쿼리, JOIN, 평탄하지 않은 쿼리 구조 등과 같은 복잡성 계층은 쿼리 성능에 부정적인 영향을 줄 수 있습니다.
작은 쿼리처리하는 행 수가 많지 않은 쿼리를 실행하는 경우, 레플리카 간 조정을 위한 네트워크 시간 때문에 쿼리 실행에 추가 사이클이 발생할 수 있으므로 여러 레플리카에서 실행하더라도 성능이 더 좋아지지 않을 수 있습니다. 다음 설정을 사용하면 이러한 문제를 줄일 수 있습니다: parallel_replicas_min_number_of_rows_per_replica.
FINAL과 함께 사용하면 병렬 레플리카가 비활성화됨
병렬 레플리카와 함께 프로젝션은 사용되지 않음
카디널리티가 높은 데이터와 복잡한 집계많은 데이터를 전송해야 하는 카디널리티가 높은 집계는 쿼리를 크게 느리게 만들 수 있습니다.
새 분석기와의 호환성새 분석기는 특정 시나리오에서 쿼리 실행 속도를 크게 늦추거나 높일 수 있습니다.
SettingDescription
enable_parallel_replicas0: 비활성화
1: 활성화
2: 병렬 레플리카 사용을 강제하며, 사용되지 않으면 예외를 발생시킵니다.
cluster_for_parallel_replicas병렬 레플리카에 사용할 클러스터 이름입니다. ClickHouse Cloud를 사용하는 경우 default를 사용합니다.
max_parallel_replicas여러 레플리카에서 쿼리를 실행할 때 사용할 최대 레플리카 수입니다. 클러스터의 레플리카 수보다 작은 값을 지정하면 노드가 무작위로 선택됩니다. 이 값은 수평 스케일링을 고려해 오버커밋할 수도 있습니다.
parallel_replicas_min_number_of_rows_per_replica처리해야 하는 행 수를 기준으로 사용할 레플리카 수를 제한하는 데 도움이 됩니다. 사용되는 레플리카 수는 다음과 같이 정의됩니다.
estimated rows to read / min_number_of_rows_per_replica
enable_analyzer병렬 레플리카를 사용한 쿼리 실행은 분석기가 활성화된 경우에만 지원됩니다

병렬 레플리카 문제 조사

각 쿼리에 어떤 설정이 사용되었는지는 system.query_log 테이블에서 확인할 수 있습니다. 또한 system.events 테이블을 보면 서버에서 발생한 모든 이벤트를 확인할 수 있으며, clusterAllReplicas 테이블 함수를 사용하면 모든 레플리카의 테이블을 확인할 수 있습니다 (클라우드 사용자인 경우 default를 사용하십시오).
Query
SELECT
   hostname(),
   *
FROM clusterAllReplicas('default', system.events)
WHERE event ILIKE '%ParallelReplicas%'
Response
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleRequestMicroseconds      │   438 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   558 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadUnassignedMarks            │   240 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadAssignedForStealingMarks   │     4 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingByHashMicroseconds     │     5 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasProcessingPartsMicroseconds    │     5 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     3 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleRequestMicroseconds      │   698 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   644 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadUnassignedMarks            │   190 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadAssignedForStealingMarks   │    54 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingByHashMicroseconds     │     8 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasProcessingPartsMicroseconds    │     4 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleRequestMicroseconds      │   620 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   656 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadUnassignedMarks            │     1 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadAssignedForStealingMarks   │     1 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingByHashMicroseconds     │     4 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasProcessingPartsMicroseconds    │     3 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     1 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleRequestMicroseconds      │   696 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   717 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadUnassignedMarks            │     2 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadAssignedForStealingMarks   │     2 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingByHashMicroseconds     │    10 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasProcessingPartsMicroseconds    │     6 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
system.text_log 테이블에는 병렬 레플리카를 사용한 쿼리 실행에 대한 정보도 포함되어 있습니다:
Query
SELECT message
FROM clusterAllReplicas('default', system.text_log)
WHERE query_id = 'ad40c712-d25d-45c4-b1a1-a28ba8d4019c'
ORDER BY event_time_microseconds ASC
Response
┌─message────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ (from 54.218.178.249:59198) SELECT * FROM session_events WHERE type='type2' LIMIT 10 SETTINGS allow_experimental_parallel_reading_from_replicas=2; (stage: Complete)                                                                                       │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage Complete │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage WithMergeableState to stage Complete │
│ The number of replicas requested (100) is bigger than the real number available in the cluster (6). Will use the latter number to execute the query.                                                                                                       │
│ Initial request from replica 4: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 4 replica

│ Reading state is fully initialized: part all_0_2_1 with ranges [(0, 182)] in replicas [4]; part all_3_3_0 with ranges [(0, 62)] in replicas [4]                                                                                                            │
│ Sent initial requests: 1 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 2: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 2 replica

│ Sent initial requests: 2 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 1 parts: [part all_0_2_1 with ranges [(128, 182)]]. Finish: false; mine_marks=0, stolen_by_hash=54, stolen_rest=0                                                                                                       │
│ Initial request from replica 1: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 1 replica

│ Sent initial requests: 3 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 2 parts: [part all_0_2_1 with ranges [(0, 128)], part all_3_3_0 with ranges [(0, 62)]]. Finish: false; mine_marks=0, stolen_by_hash=0, stolen_rest=190                                                                  │
│ Initial request from replica 0: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 0 replica

│ Sent initial requests: 4 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 5: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 5 replica

│ Sent initial requests: 5 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 2, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 2 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Initial request from replica 3: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 3 replica

│ Sent initial requests: 6 Replicas count: 6                                                                                                                                                                                                                 │
│ Total rows to read: 2000000                                                                                                                                                                                                                                │
│ Handling request from replica 5, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 5 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 0, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 0 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 1, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 1 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 3, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 3 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ (c-crimson-vd-86-server-rdhnsx3-0.c-crimson-vd-86-server-headless.ns-crimson-vd-86.svc.cluster.local:9000) Cancelling query because enough data has been read                                                                                              │
│ Read 81920 rows, 5.16 MiB in 0.013166 sec., 6222087.194288318 rows/sec., 391.63 MiB/sec.                                                                                                                                                                   │
│ Coordination done: Statistics: replica 0 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 1 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 2 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 3 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 4 - {requests: 3 marks: 244 assigned_to_me: 0 stolen_by_hash: 54 stolen_unassigned: 190}; replica 5 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0} │
│ Peak memory usage (for query): 1.81 MiB.                                                                                                                                                                                                                   │
│ Processed in 0.024095586 sec.                                                                                                                                                                                                                              │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
마지막으로 EXPLAIN PIPELINE도 사용할 수 있습니다. 이는 ClickHouse가 쿼리를 어떤 방식으로 실행하는지와 쿼리 실행에 어떤 리소스가 사용되는지를 보여줍니다. 다음 쿼리를 예시로 살펴보겠습니다:
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) LIMIT 10
병렬 레플리카 없이 쿼리 파이프라인을 살펴보겠습니다:
EXPLAIN PIPELINE (without parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=0 
FORMAT TSV;
이제 parallel replica를 사용한 경우입니다:
EXPLAIN PIPELINE (with parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=2 
FORMAT TSV;
마지막 수정일 2026년 6월 10일