Saltar al contenido principal

Introducción

ClickHouse procesa las consultas con gran rapidez, pero ¿cómo se distribuyen y paralelizan estas consultas entre varios servidores?
En esta guía, primero veremos cómo ClickHouse distribuye una consulta entre varios segmentos mediante tablas distribuidas, y luego cómo una consulta puede aprovechar varias réplicas durante su ejecución.

Arquitectura segmentada

En una arquitectura shared-nothing, los clústeres suelen dividirse en múltiples segmentos, y cada segmento contiene un subconjunto del total de datos. Una tabla distribuida se sitúa por encima de estos segmentos y proporciona una vista unificada de todos los datos. Las lecturas pueden enviarse a la tabla local. La ejecución de la consulta ocurrirá solo en el segmento especificado, o bien puede enviarse a la tabla distribuida, y en ese caso, cada segmento ejecutará las consultas indicadas. El servidor en el que se consultó la tabla distribuida agregará los datos y responderá al cliente: La figura anterior muestra lo que sucede cuando un cliente consulta una tabla distribuida:
  1. La consulta SELECT se envía arbitrariamente a una tabla distribuida en un nodo (mediante una estrategia round-robin o después de ser enrutada a un servidor específico por un balanceador de carga). Este nodo pasará a actuar como coordinador.
  2. El nodo localizará cada segmento que deba ejecutar la consulta mediante la información especificada por la tabla distribuida, y la consulta se enviará a cada segmento.
  3. Cada segmento lee, filtra y agrega los datos localmente, y luego devuelve un estado fusionable al coordinador.
  4. El nodo coordinador fusiona los datos y luego envía la respuesta al cliente.
Cuando añadimos réplicas a la ecuación, el proceso es bastante similar, con la única diferencia de que solo una réplica de cada segmento ejecutará la consulta. Esto significa que luego pueden procesarse más consultas en paralelo.

Arquitectura sin segmentos

ClickHouse Cloud tiene una arquitectura muy distinta de la presentada anteriormente. (Consulta “Arquitectura de ClickHouse Cloud” para obtener más detalles). Con la separación entre cómputo y almacenamiento, y con una capacidad de almacenamiento prácticamente infinita, la necesidad de usar segmentos pasa a ser menos importante. La siguiente figura muestra la arquitectura de ClickHouse Cloud: Esta arquitectura nos permite añadir y eliminar réplicas casi instantáneamente, lo que garantiza una escalabilidad muy alta del clúster. El clúster de ClickHouse Keeper (mostrado a la derecha) garantiza que haya una única fuente de verdad para los metadatos. Las réplicas pueden obtener los metadatos del clúster de ClickHouse Keeper y mantener así los mismos datos. Los datos en sí se almacenan en almacenamiento de objetos, y la caché SSD nos permite acelerar las consultas. Pero ¿cómo podemos distribuir ahora la ejecución de consultas entre varios servidores? En una arquitectura con segmentos, era bastante obvio, ya que cada segmento podía ejecutar una consulta sobre un subconjunto de los datos. ¿Cómo funciona cuando no hay segmentación?

Introducción a las réplicas paralelas

Para paralelizar la ejecución de consultas a través de varios servidores, primero necesitamos poder asignar uno de nuestros servidores como coordinador. El coordinador es quien crea la lista de tareas que deben ejecutarse, garantiza que todas se ejecuten, se agreguen y que el resultado se devuelva al cliente. Como en la mayoría de los sistemas distribuidos, este será el papel del nodo que recibe la consulta inicial. También necesitamos definir la unidad de trabajo. En una arquitectura con segmentos, la unidad de trabajo es el segmento, un subconjunto de los datos. Con las réplicas paralelas usaremos una pequeña porción de la tabla, llamada gránulos, como unidad de trabajo. Ahora, veamos cómo funciona en la práctica con ayuda de la siguiente figura: Con réplicas paralelas:
  1. La consulta del cliente se envía a un nodo después de pasar por un balanceador de carga. Este nodo se convierte en el coordinador de esta consulta.
  2. El nodo analiza el índice de cada parte y selecciona las partes y los gránulos adecuados para procesar.
  3. El coordinador divide la carga de trabajo en un conjunto de gránulos que pueden asignarse a diferentes réplicas.
  4. Cada conjunto de gránulos es procesado por las réplicas correspondientes y se envía un estado fusionable al coordinador cuando terminan.
  5. Por último, el coordinador fusiona todos los resultados de las réplicas y luego devuelve la respuesta al cliente.
Los pasos anteriores describen cómo funcionan las réplicas paralelas en teoría. Sin embargo, en la práctica, hay muchos factores que pueden impedir que esta lógica funcione a la perfección:
  1. Algunas réplicas pueden no estar disponibles.
  2. La replicación en ClickHouse es asíncrona; es posible que algunas réplicas no tengan las mismas partes en un momento dado.
  3. La latencia de cola entre réplicas debe gestionarse de algún modo.
  4. La caché del sistema de archivos varía de una réplica a otra según la actividad en cada réplica, lo que significa que una asignación aleatoria de tareas podría dar lugar a un rendimiento subóptimo debido a la localidad de la caché.
En las siguientes secciones veremos cómo se superan estos factores.

Anuncios

Para abordar (1) y (2) de la lista anterior, introdujimos el concepto de anuncio. Veamos cómo funciona en la siguiente figura:
  1. La consulta del cliente se envía a un nodo tras pasar por un balanceador de carga. Ese nodo se convierte en el coordinador de la consulta.
  2. El nodo coordinador envía una solicitud para obtener anuncios de todas las réplicas del clúster. Las réplicas pueden tener vistas ligeramente diferentes del conjunto actual de partes de una tabla. Por ello, necesitamos recopilar esta información para evitar decisiones de planificación incorrectas.
  3. A continuación, el nodo coordinador usa los anuncios para definir un conjunto de gránulos que pueden asignarse a las distintas réplicas. Aquí, por ejemplo, podemos ver que no se asignó ningún gránulo de la parte 3 a la réplica 2 porque esta réplica no incluyó esa parte en su anuncio. Observe también que no se asignaron tareas a la réplica 3 porque la réplica no proporcionó ningún anuncio.
  4. Después de que cada réplica haya procesado la consulta sobre su subconjunto de gránulos y el estado fusionable se haya enviado de vuelta al coordinador, el coordinador fusiona los resultados y la respuesta se envía al cliente.

Coordinación dinámica

Para abordar el problema de la latencia de cola, añadimos coordinación dinámica. Esto significa que no todos los gránulos se envían a una réplica en una sola solicitud, sino que cada réplica podrá solicitar una nueva tarea (un conjunto de gránulos que se van a procesar) al coordinador. El coordinador proporcionará a la réplica el conjunto de gránulos según el anuncio recibido. Supongamos que estamos en la fase del proceso en la que todas las réplicas han enviado un anuncio con todas las partes. La figura siguiente muestra cómo funciona la coordinación dinámica:
  1. Las réplicas informan al nodo coordinador de que pueden procesar tareas; también pueden especificar cuánta carga de trabajo pueden procesar.
  2. El coordinador asigna tareas a las réplicas.
  1. Las réplicas 1 y 2 pueden terminar su tarea muy rápidamente. Entonces solicitarán otra tarea al nodo coordinador.
  2. El coordinador asigna nuevas tareas a las réplicas 1 y 2.
  1. Todas las réplicas ya han terminado de procesar su tarea. Solicitan más tareas.
  2. El coordinador, usando los anuncios, comprueba qué tareas quedan por procesar, pero no queda ninguna.
  3. El coordinador informa a las réplicas de que todo ya se ha procesado. Ahora combinará todos los estados fusionables y responderá a la consulta.

Gestión de la localidad de la caché

El último problema potencial que queda es cómo manejamos la localidad de la caché. Si la consulta se ejecuta varias veces, ¿cómo podemos asegurarnos de que la misma tarea se dirija a la misma réplica? En el ejemplo anterior, teníamos las siguientes tareas asignadas:
Réplica 1Réplica 2Réplica 3
Parte 1g1, g6, g7g2, g4, g5g3
Parte 2g1g2, g4, g5g3
Parte 3g1, g6g2, g4, g5g3
Para garantizar que las mismas tareas se asignen a las mismas réplicas y puedan beneficiarse de la caché, ocurren dos cosas. Se calcula un hash de la parte y del conjunto de gránulos (una tarea). Se aplica el módulo del número de réplicas para la asignación de tareas. En teoría, esto suena bien, pero en la práctica, una carga repentina en una réplica o una degradación de la red pueden introducir latencia de cola si se usa sistemáticamente la misma réplica para ejecutar determinadas tareas. Si max_parallel_replicas es menor que el número de réplicas, entonces se eligen réplicas aleatorias para la ejecución de consultas.

Robo de tareas

si una réplica procesa tareas más lentamente que las demás, otras réplicas intentarán ‘robar’ tareas que, en principio, pertenecen a esa réplica según el hash para reducir la latencia de cola.

Limitaciones

Esta funcionalidad tiene limitaciones conocidas, y en esta sección se documentan las principales.
Si encuentra un problema que no corresponde a ninguna de las limitaciones indicadas a continuación y sospecha que las réplicas paralelas son la causa, abra un issue en GitHub con la etiqueta comp-parallel-replicas.
LimitaciónDescripción
Consultas complejasActualmente, las réplicas paralelas funcionan bastante bien con consultas simples. Capas de complejidad como CTE, subconsultas, JOIN, consultas no planas, etc. pueden tener un impacto negativo en el rendimiento de la consulta.
Consultas pequeñasSi está ejecutando una consulta que no procesa muchas filas, ejecutarla en varias réplicas puede no mejorar el tiempo de respuesta, ya que el tiempo de red necesario para la coordinación entre réplicas puede añadir ciclos adicionales a la ejecución de la consulta. Puede mitigar estos problemas usando la configuración parallel_replicas_min_number_of_rows_per_replica.
Las réplicas paralelas están deshabilitadas con FINAL
Las proyecciones no se usan junto con las réplicas paralelas
Datos de alta cardinalidad y agregación complejaLa agregación de alta cardinalidad que requiere enviar muchos datos puede ralentizar significativamente sus consultas.
Compatibilidad con el nuevo analizadorEl nuevo analizador puede ralentizar o acelerar significativamente la ejecución de consultas en determinados escenarios.
SettingDescription
enable_parallel_replicas0: deshabilitado
1: habilitado
2: Fuerza el uso de réplicas paralelas; lanzará una excepción si no se utilizan.
cluster_for_parallel_replicasEl nombre del clúster que se usará para las réplicas paralelas; si usas ClickHouse Cloud, utiliza default.
max_parallel_replicasNúmero máximo de réplicas que se usarán para ejecutar la consulta en múltiples réplicas; si se especifica un número inferior al de réplicas del clúster, los nodos se seleccionarán aleatoriamente. Este valor también puede sobreasignarse para tener en cuenta el escalado horizontal.
parallel_replicas_min_number_of_rows_per_replicaAyuda a limitar el número de réplicas utilizadas en función del número de filas que deben procesarse; el número de réplicas utilizadas se define mediante:
estimated rows to read / min_number_of_rows_per_replica.
enable_analyzerLa ejecución de consultas con réplicas paralelas solo es compatible cuando el analizador está habilitado

Investigar problemas con las réplicas paralelas

Puedes comprobar qué configuraciones se están usando para cada consulta en la tabla system.query_log. También puedes consultar la tabla system.events para ver todos los eventos que han ocurrido en el servidor, y puedes usar la función de tabla clusterAllReplicas para ver las tablas en todas las réplicas (si usas Cloud, utiliza default).
Query
SELECT
   hostname(),
   *
FROM clusterAllReplicas('default', system.events)
WHERE event ILIKE '%ParallelReplicas%'
Response
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleRequestMicroseconds      │   438 │ Tiempo empleado en procesar solicitudes de marks de réplicas                                               │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   558 │ Tiempo empleado en procesar anuncios de réplicas                                                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadUnassignedMarks            │   240 │ Suma de todas las réplicas de cuántos marks sin asignar fueron programados                                  │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadAssignedForStealingMarks   │     4 │ Suma de todas las réplicas de cuántos marks programados fueron asignados para robo mediante consistent hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingByHashMicroseconds     │     5 │ Tiempo empleado en recopilar segmentos destinados al robo por hash                                            │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasProcessingPartsMicroseconds    │     5 │ Tiempo empleado en procesar partes de datos                                                                     │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     3 │ Tiempo empleado en recopilar segmentos huérfanos                                                              │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasUsedCount                      │     2 │ Número de réplicas utilizadas para ejecutar una consulta con réplicas paralelas basadas en tareas                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasAvailableCount                 │     6 │ Número de réplicas disponibles para ejecutar una consulta con réplicas paralelas basadas en tareas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleRequestMicroseconds      │   698 │ Tiempo empleado en procesar solicitudes de marks de réplicas                                               │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   644 │ Tiempo empleado en procesar anuncios de réplicas                                                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadUnassignedMarks            │   190 │ Suma de todas las réplicas de cuántos marks sin asignar fueron programados                                  │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadAssignedForStealingMarks   │    54 │ Suma de todas las réplicas de cuántos marks programados fueron asignados para robo mediante consistent hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingByHashMicroseconds     │     8 │ Tiempo empleado en recopilar segmentos destinados al robo por hash                                            │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasProcessingPartsMicroseconds    │     4 │ Tiempo empleado en procesar partes de datos                                                                     │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Tiempo empleado en recopilar segmentos huérfanos                                                              │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasUsedCount                      │     2 │ Número de réplicas utilizadas para ejecutar una consulta con réplicas paralelas basadas en tareas                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasAvailableCount                 │     6 │ Número de réplicas disponibles para ejecutar una consulta con réplicas paralelas basadas en tareas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleRequestMicroseconds      │   620 │ Tiempo empleado en procesar solicitudes de marks de réplicas                                               │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   656 │ Tiempo empleado en procesar anuncios de réplicas                                                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadUnassignedMarks            │     1 │ Suma de todas las réplicas de cuántos marks sin asignar fueron programados                                  │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadAssignedForStealingMarks   │     1 │ Suma de todas las réplicas de cuántos marks programados fueron asignados para robo mediante consistent hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingByHashMicroseconds     │     4 │ Tiempo empleado en recopilar segmentos destinados al robo por hash                                            │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasProcessingPartsMicroseconds    │     3 │ Tiempo empleado en procesar partes de datos                                                                     │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     1 │ Tiempo empleado en recopilar segmentos huérfanos                                                              │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasUsedCount                      │     2 │ Número de réplicas utilizadas para ejecutar una consulta con réplicas paralelas basadas en tareas                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasAvailableCount                 │    12 │ Número de réplicas disponibles para ejecutar una consulta con réplicas paralelas basadas en tareas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleRequestMicroseconds      │   696 │ Tiempo empleado en procesar solicitudes de marks de réplicas                                               │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   717 │ Tiempo empleado en procesar anuncios de réplicas                                                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadUnassignedMarks            │     2 │ Suma de todas las réplicas de cuántos marks sin asignar fueron programados                                  │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadAssignedForStealingMarks   │     2 │ Suma de todas las réplicas de cuántos marks programados fueron asignados para robo mediante consistent hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingByHashMicroseconds     │    10 │ Tiempo empleado en recopilar segmentos destinados al robo por hash                                            │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasProcessingPartsMicroseconds    │     6 │ Tiempo empleado en procesar partes de datos                                                                     │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Tiempo empleado en recopilar segmentos huérfanos                                                              │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasUsedCount                      │     2 │ Número de réplicas utilizadas para ejecutar una consulta con réplicas paralelas basadas en tareas                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasAvailableCount                 │    12 │ Número de réplicas disponibles para ejecutar una consulta con réplicas paralelas basadas en tareas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
La tabla system.text_log también contiene información sobre la ejecución de consultas usando réplicas paralelas:
Query
SELECT message
FROM clusterAllReplicas('default', system.text_log)
WHERE query_id = 'ad40c712-d25d-45c4-b1a1-a28ba8d4019c'
ORDER BY event_time_microseconds ASC
Response
┌─message────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ (from 54.218.178.249:59198) SELECT * FROM session_events WHERE type='type2' LIMIT 10 SETTINGS allow_experimental_parallel_reading_from_replicas=2; (stage: Complete)                                                                                       │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage Complete │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage WithMergeableState to stage Complete │
│ The number of replicas requested (100) is bigger than the real number available in the cluster (6). Will use the latter number to execute the query.                                                                                                       │
│ Initial request from replica 4: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 4 replica

│ Reading state is fully initialized: part all_0_2_1 with ranges [(0, 182)] in replicas [4]; part all_3_3_0 with ranges [(0, 62)] in replicas [4]                                                                                                            │
│ Sent initial requests: 1 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 2: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 2 replica

│ Sent initial requests: 2 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 1 parts: [part all_0_2_1 with ranges [(128, 182)]]. Finish: false; mine_marks=0, stolen_by_hash=54, stolen_rest=0                                                                                                       │
│ Initial request from replica 1: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 1 replica

│ Sent initial requests: 3 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 2 parts: [part all_0_2_1 with ranges [(0, 128)], part all_3_3_0 with ranges [(0, 62)]]. Finish: false; mine_marks=0, stolen_by_hash=0, stolen_rest=190                                                                  │
│ Initial request from replica 0: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 0 replica

│ Sent initial requests: 4 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 5: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 5 replica

│ Sent initial requests: 5 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 2, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 2 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Initial request from replica 3: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 3 replica

│ Sent initial requests: 6 Replicas count: 6                                                                                                                                                                                                                 │
│ Total rows to read: 2000000                                                                                                                                                                                                                                │
│ Handling request from replica 5, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 5 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 0, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 0 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 1, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 1 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 3, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 3 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ (c-crimson-vd-86-server-rdhnsx3-0.c-crimson-vd-86-server-headless.ns-crimson-vd-86.svc.cluster.local:9000) Cancelling query because enough data has been read                                                                                              │
│ Read 81920 rows, 5.16 MiB in 0.013166 sec., 6222087.194288318 rows/sec., 391.63 MiB/sec.                                                                                                                                                                   │
│ Coordination done: Statistics: replica 0 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 1 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 2 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 3 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 4 - {requests: 3 marks: 244 assigned_to_me: 0 stolen_by_hash: 54 stolen_unassigned: 190}; replica 5 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0} │
│ Peak memory usage (for query): 1.81 MiB.                                                                                                                                                                                                                   │
│ Processed in 0.024095586 sec.                                                                                                                                                                                                                              │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Por último, también puede usar EXPLAIN PIPELINE. Muestra cómo ClickHouse va a ejecutar una consulta y qué recursos se van a utilizar durante la ejecución de la consulta. Tomemos la siguiente consulta como ejemplo:
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) LIMIT 10
Veamos el pipeline de consulta sin réplica paralela:
EXPLAIN PIPELINE (without parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=0 
FORMAT TSV;
Y ahora, con réplica paralela:
EXPLAIN PIPELINE (with parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=2 
FORMAT TSV;
Última modificación el 10 de junio de 2026