Перейти к основному содержанию

Введение

ClickHouse обрабатывает запросы чрезвычайно быстро, но как эти запросы распределяются между несколькими серверами и выполняются параллельно?
В этом руководстве мы сначала рассмотрим, как ClickHouse распределяет запрос по нескольким сегментам с помощью distributed таблиц, а затем — как запрос может задействовать несколько реплик при выполнении.

Архитектура с сегментированием

В архитектуре shared-nothing кластер обычно разделяется на несколько сегментов, при этом каждый сегмент содержит подмножество всех данных. Над этими сегментами располагается distributed таблица, предоставляющая единое представление всего набора данных. Операции чтения можно направлять в локальную таблицу. В этом случае выполнение запроса будет происходить только на указанном сегменте. Либо запрос можно направить в distributed таблицу, и тогда каждый сегмент выполнит данный запрос. Сервер, на котором был выполнен запрос к distributed таблице, агрегирует данные и вернет ответ клиенту: На рисунке выше показано, что происходит, когда клиент выполняет запрос к distributed таблице:
  1. Запрос SELECT отправляется в distributed таблицу на одном из узлов произвольным образом (по стратегии round-robin или после маршрутизации на конкретный сервер с помощью балансировщика нагрузки). Этот узел начинает выступать в роли координатора.
  2. Узел определяет каждый сегмент, который должен выполнить запрос, на основе информации, указанной в distributed таблице, после чего запрос отправляется в каждый сегмент.
  3. Каждый сегмент читает, фильтрует и агрегирует данные локально, а затем отправляет координатору состояние, пригодное для слияния.
  4. Координирующий узел объединяет данные, а затем отправляет ответ клиенту.
Когда в эту схему добавляются реплики, процесс остается практически таким же, с той лишь разницей, что запрос выполняет только одна реплика из каждого сегмента. Это означает, что больше запросов можно обрабатывать параллельно.

Архитектура без сегментирования

Архитектура ClickHouse Cloud значительно отличается от представленной выше. (Подробнее см. в разделе “Архитектура ClickHouse Cloud”). Благодаря разделению вычислительных ресурсов и хранилища, а также практически неограниченному объему хранения, потребность в сегментах становится менее важной. На рисунке ниже показана архитектура ClickHouse Cloud: Эта архитектура позволяет почти мгновенно добавлять и удалять реплики, обеспечивая очень высокую масштабируемость кластера. Кластер ClickHouse Keeper (показан справа) обеспечивает единый источник истины для метаданных. Реплики могут получать метаданные из кластера ClickHouse Keeper и поддерживать одни и те же данные. Сами данные хранятся в объектном хранилище, а SSD-кэш позволяет ускорять запросы. Но как теперь распределяется выполнение запросов между несколькими серверами? В архитектуре с сегментированием это было вполне очевидно, поскольку каждый сегмент мог выполнять запрос на подмножестве данных. Как это работает, когда сегментирования нет?

Введение в параллельные реплики

Чтобы распараллелить выполнение запроса между несколькими серверами, сначала нужно иметь возможность назначить один из серверов координатором. Координатор — это узел, который формирует список задач для выполнения, следит за тем, чтобы все они были выполнены и агрегированы, а затем возвращает результат клиенту. Как и в большинстве распределенных систем, эту роль выполняет узел, который получает исходный запрос. Также нужно определить единицу работы. В архитектуре с сегментами единицей работы является сегмент — подмножество данных. В случае параллельных реплик в качестве единицы работы используется небольшая часть таблицы — гранулы. Теперь посмотрим, как это работает на практике, с помощью схемы ниже: С параллельными репликами:
  1. Запрос от клиента проходит через балансировщик нагрузки и отправляется на один из узлов. Этот узел становится координатором для данного запроса.
  2. Узел анализирует индекс каждой части и выбирает подходящие части и гранулы для обработки.
  3. Координатор разбивает рабочую нагрузку на набор гранул, которые могут быть назначены разным репликам.
  4. Каждый набор гранул обрабатывается соответствующими репликами, а по завершении на координатор отправляется состояние, пригодное для слияния.
  5. Наконец, координатор объединяет все результаты от реплик и возвращает ответ клиенту.
Описанные выше шаги показывают, как параллельные реплики работают в теории. Однако на практике есть множество факторов, которые могут помешать такой схеме работать идеально:
  1. Некоторые реплики могут быть недоступны.
  2. Репликация в ClickHouse асинхронна, поэтому в определенный момент времени у реплик могут быть разные части.
  3. Нужно как-то учитывать высокую задержку на отдельных репликах.
  4. Файловый кэш различается от реплики к реплике в зависимости от активности на каждой из них, поэтому случайное назначение задач может привести к менее оптимальной производительности из-за локальности кэша.
Как именно удается справиться с этими факторами, мы рассмотрим в следующих разделах.

Уведомления

Чтобы решить проблемы (1) и (2) из приведенного выше списка, мы ввели понятие уведомления. Давайте посмотрим, как это работает, на схеме ниже:
  1. Запрос от клиента проходит через балансировщик нагрузки и отправляется на один из узлов. Этот узел становится координатором данного запроса.
  2. Узел-координатор отправляет запрос на получение уведомлений от всех реплик в кластере. У реплик могут быть немного разные представления о текущем наборе частей таблицы. Поэтому эту информацию нужно собрать, чтобы избежать неверных решений при планировании.
  3. Затем узел-координатор использует уведомления, чтобы определить набор гранул, которые можно назначить разным репликам. Здесь, например, видно, что реплике 2 не была назначена ни одна гранула из части 3, потому что эта реплика не включила эту часть в свое уведомление. Также обратите внимание, что реплике 3 не было назначено ни одной задачи, потому что от этой реплики не поступило уведомление.
  4. После того как каждая реплика обработала запрос на своем подмножестве гранул и объединяемое состояние было отправлено обратно координатору, координатор объединяет результаты, и ответ отправляется клиенту.

Динамическая координация

Чтобы решить проблему хвостовой задержки, мы добавили динамическую координацию. Это означает, что все гранулы не отправляются реплике одним запросом; вместо этого каждая реплика может запрашивать у координатора новую задачу (набор гранул для обработки). Координатор будет выдавать реплике набор гранул на основе полученного уведомления. Предположим, что мы находимся на этапе, когда все реплики уже отправили уведомление со всеми частями. На рисунке ниже показано, как работает динамическая координация:
  1. Реплики сообщают узлу-координатору, что готовы обрабатывать задачи; они также могут указать, какой объём работы способны взять.
  2. Координатор назначает репликам задачи.
  1. Реплики 1 и 2 завершают свою задачу очень быстро. Они запрашивают у узла-координатора ещё одну задачу.
  2. Координатор назначает новые задачи репликам 1 и 2.
  1. Все реплики уже завершили обработку своих задач. Они запрашивают дополнительные задачи.
  2. Координатор, используя уведомления, проверяет, какие задачи ещё осталось обработать, но таких задач больше нет.
  3. Координатор сообщает репликам, что всё обработано. Теперь он объединит все состояния, подлежащие слиянию, и вернёт ответ на запрос.

Управление локальностью кэша

Последний потенциальный вопрос — как обеспечить локальность кэша. Если запрос выполняется несколько раз, как гарантировать, что одна и та же задача будет направляться на ту же реплику? В предыдущем примере задачи были распределены так:
Реплика 1Реплика 2Реплика 3
Часть 1g1, g6, g7g2, g4, g5g3
Часть 2g1g2, g4, g5g3
Часть 3g1, g6g2, g4, g5g3
Чтобы одни и те же задачи назначались одним и тем же репликам и могли использовать кэш, происходят две вещи. Вычисляется хеш part + набора гранул (то есть задачи). Затем для назначения задачи берется остаток от деления на число реплик. В теории это звучит хорошо, но на практике внезапная нагрузка на одну реплику или ухудшение состояния сети могут увеличить хвостовую задержку, если одна и та же реплика постоянно используется для выполнения определенных задач. Если max_parallel_replicas меньше числа реплик, то для выполнения запроса случайным образом выбираются реплики.

Перехват задач

если какая-либо реплика обрабатывает задачи медленнее остальных, другие реплики попытаются «перехватить» задачи, которые по хешу изначально принадлежат этой реплике, чтобы уменьшить задержку в хвосте распределения.

Ограничения

У этой возможности есть известные ограничения; основные из них описаны в этом разделе.
Если вы столкнулись с проблемой, которая не относится ни к одному из перечисленных ниже ограничений, и подозреваете, что её причина — параллельная реплика, пожалуйста, создайте issue в GitHub, используя метку comp-parallel-replicas.
ОграничениеОписание
Сложные запросыВ настоящее время параллельная реплика достаточно хорошо работает с простыми запросами. Дополнительные уровни сложности, такие как CTE, подзапросы, JOIN, неплоские запросы и т. д., могут негативно сказываться на производительности запроса.
Небольшие запросыЕсли вы выполняете запрос, который обрабатывает небольшое количество строк, запуск на нескольких репликах может не дать прироста производительности, поскольку сетевые затраты на координацию между репликами могут приводить к дополнительным накладным расходам при выполнении запроса. Вы можете уменьшить влияние этой проблемы, используя настройку parallel_replicas_min_number_of_rows_per_replica.
Параллельные реплики отключаются при использовании FINAL
Проекции не используются вместе с параллельными репликами
Данные с высокой мощностью и сложная агрегацияАгрегация по данным с высокой мощностью, при которой требуется передавать большие объёмы данных, может значительно замедлить выполнение запросов.
Совместимость с новым анализаторомНовый анализатор в некоторых сценариях может как значительно замедлить, так и ускорить выполнение запроса.
НастройкаОписание
enable_parallel_replicas0: отключено
1: включено
2: Принудительно использовать параллельные реплики; если они не будут задействованы, будет сгенерировано исключение.
cluster_for_parallel_replicasИмя кластера, используемого для параллельной репликации; если вы используете ClickHouse Cloud, укажите default.
max_parallel_replicasМаксимальное количество реплик, используемых для выполнения запроса на нескольких репликах; если указано число меньше количества реплик в кластере, узлы будут выбраны случайным образом. Это значение также может быть завышено с учетом горизонтального масштабирования.
parallel_replicas_min_number_of_rows_per_replicaПомогает ограничить количество используемых реплик в зависимости от числа строк, которые необходимо обработать; количество используемых реплик определяется так:
estimated rows to read / min_number_of_rows_per_replica.
enable_analyzerВыполнение запросов с параллельными репликами поддерживается только при включенном анализаторе

Исследование проблем с параллельными репликами

Вы можете проверить, какие настройки используются для каждого запроса, в таблице system.query_log. Также можно посмотреть таблицу system.events, чтобы увидеть все события, произошедшие на сервере, а также использовать табличную функцию clusterAllReplicas, чтобы увидеть таблицы на всех репликах (если вы используете Cloud, укажите default).
Query
SELECT
   hostname(),
   *
FROM clusterAllReplicas('default', system.events)
WHERE event ILIKE '%ParallelReplicas%'
Response
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleRequestMicroseconds      │   438 │ Время, затраченное на обработку запросов на метки от реплик                                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   558 │ Время, затраченное на обработку уведомлений от реплик                                               │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadUnassignedMarks            │   240 │ Сумма по всем репликам: сколько неназначенных меток было запланировано                              │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadAssignedForStealingMarks   │     4 │ Сумма по всем репликам: сколько запланированных меток было назначено для перехвата по consistent hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingByHashMicroseconds     │     5 │ Время, затраченное на сбор сегментов, предназначенных для перехвата по hash                        │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasProcessingPartsMicroseconds    │     5 │ Время, затраченное на обработку частей данных                                                       │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     3 │ Время, затраченное на сбор бесхозных сегментов                                                     │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasUsedCount                      │     2 │ Количество реплик, использованных для выполнения запроса с параллельными репликами на основе задач  │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasAvailableCount                 │     6 │ Количество реплик, доступных для выполнения запроса с параллельными репликами на основе задач       │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleRequestMicroseconds      │   698 │ Время, затраченное на обработку запросов на метки от реплик                                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   644 │ Время, затраченное на обработку уведомлений от реплик                                               │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadUnassignedMarks            │   190 │ Сумма по всем репликам: сколько неназначенных меток было запланировано                              │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadAssignedForStealingMarks   │    54 │ Сумма по всем репликам: сколько запланированных меток было назначено для перехвата по consistent hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingByHashMicroseconds     │     8 │ Время, затраченное на сбор сегментов, предназначенных для перехвата по hash                        │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasProcessingPartsMicroseconds    │     4 │ Время, затраченное на обработку частей данных                                                       │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Время, затраченное на сбор бесхозных сегментов                                                     │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasUsedCount                      │     2 │ Количество реплик, использованных для выполнения запроса с параллельными репликами на основе задач  │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasAvailableCount                 │     6 │ Количество реплик, доступных для выполнения запроса с параллельными репликами на основе задач       │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleRequestMicroseconds      │   620 │ Время, затраченное на обработку запросов на метки от реплик                                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   656 │ Время, затраченное на обработку уведомлений от реплик                                               │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadUnassignedMarks            │     1 │ Сумма по всем репликам: сколько неназначенных меток было запланировано                              │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadAssignedForStealingMarks   │     1 │ Сумма по всем репликам: сколько запланированных меток было назначено для перехвата по consistent hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingByHashMicroseconds     │     4 │ Время, затраченное на сбор сегментов, предназначенных для перехвата по hash                        │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasProcessingPartsMicroseconds    │     3 │ Время, затраченное на обработку частей данных                                                       │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     1 │ Время, затраченное на сбор бесхозных сегментов                                                     │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasUsedCount                      │     2 │ Количество реплик, использованных для выполнения запроса с параллельными репликами на основе задач  │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasAvailableCount                 │    12 │ Количество реплик, доступных для выполнения запроса с параллельными репликами на основе задач       │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleRequestMicroseconds      │   696 │ Время, затраченное на обработку запросов на метки от реплик                                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   717 │ Время, затраченное на обработку уведомлений от реплик                                               │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadUnassignedMarks            │     2 │ Сумма по всем репликам: сколько неназначенных меток было запланировано                              │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadAssignedForStealingMarks   │     2 │ Сумма по всем репликам: сколько запланированных меток было назначено для перехвата по consistent hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingByHashMicroseconds     │    10 │ Время, затраченное на сбор сегментов, предназначенных для перехвата по hash                        │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasProcessingPartsMicroseconds    │     6 │ Время, затраченное на обработку частей данных                                                       │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Время, затраченное на сбор бесхозных сегментов                                                     │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasUsedCount                      │     2 │ Количество реплик, использованных для выполнения запроса с параллельными репликами на основе задач  │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasAvailableCount                 │    12 │ Количество реплик, доступных для выполнения запроса с параллельными репликами на основе задач       │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
Таблица system.text_log также содержит информацию о выполнении запросов при использовании параллельных реплик:
Query
SELECT message
FROM clusterAllReplicas('default', system.text_log)
WHERE query_id = 'ad40c712-d25d-45c4-b1a1-a28ba8d4019c'
ORDER BY event_time_microseconds ASC
Response
┌─message────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ (from 54.218.178.249:59198) SELECT * FROM session_events WHERE type='type2' LIMIT 10 SETTINGS allow_experimental_parallel_reading_from_replicas=2; (stage: Complete)                                                                                       │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage Complete │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage WithMergeableState to stage Complete │
│ The number of replicas requested (100) is bigger than the real number available in the cluster (6). Will use the latter number to execute the query.                                                                                                       │
│ Initial request from replica 4: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 4 replica

│ Reading state is fully initialized: part all_0_2_1 with ranges [(0, 182)] in replicas [4]; part all_3_3_0 with ranges [(0, 62)] in replicas [4]                                                                                                            │
│ Sent initial requests: 1 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 2: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 2 replica

│ Sent initial requests: 2 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 1 parts: [part all_0_2_1 with ranges [(128, 182)]]. Finish: false; mine_marks=0, stolen_by_hash=54, stolen_rest=0                                                                                                       │
│ Initial request from replica 1: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 1 replica

│ Sent initial requests: 3 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 2 parts: [part all_0_2_1 with ranges [(0, 128)], part all_3_3_0 with ranges [(0, 62)]]. Finish: false; mine_marks=0, stolen_by_hash=0, stolen_rest=190                                                                  │
│ Initial request from replica 0: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 0 replica

│ Sent initial requests: 4 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 5: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 5 replica

│ Sent initial requests: 5 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 2, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 2 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Initial request from replica 3: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 3 replica

│ Sent initial requests: 6 Replicas count: 6                                                                                                                                                                                                                 │
│ Total rows to read: 2000000                                                                                                                                                                                                                                │
│ Handling request from replica 5, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 5 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 0, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 0 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 1, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 1 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 3, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 3 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ (c-crimson-vd-86-server-rdhnsx3-0.c-crimson-vd-86-server-headless.ns-crimson-vd-86.svc.cluster.local:9000) Cancelling query because enough data has been read                                                                                              │
│ Read 81920 rows, 5.16 MiB in 0.013166 sec., 6222087.194288318 rows/sec., 391.63 MiB/sec.                                                                                                                                                                   │
│ Coordination done: Statistics: replica 0 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 1 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 2 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 3 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 4 - {requests: 3 marks: 244 assigned_to_me: 0 stolen_by_hash: 54 stolen_unassigned: 190}; replica 5 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0} │
│ Peak memory usage (for query): 1.81 MiB.                                                                                                                                                                                                                   │
│ Processed in 0.024095586 sec.                                                                                                                                                                                                                              │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Наконец, вы также можете использовать EXPLAIN PIPELINE. Он показывает, как ClickHouse будет выполнять запрос и какие ресурсы будут задействованы при его выполнении. Возьмем, к примеру, следующий запрос:
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) LIMIT 10
Давайте посмотрим на конвейер выполнения запроса без параллельной реплики:
EXPLAIN PIPELINE (without parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=0 
FORMAT TSV;
А теперь — с параллельной репликой:
EXPLAIN PIPELINE (with parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=2 
FORMAT TSV;
Последнее изменение 10 июня 2026 г.