Pular para o conteúdo principal

Introdução

O ClickHouse processa consultas com extrema rapidez, mas como essas consultas são distribuídas e executadas em paralelo em vários servidores?
Neste guia, primeiro veremos como o ClickHouse distribui uma consulta entre múltiplos shards por meio de tabelas distribuídas e, em seguida, como uma consulta pode aproveitar múltiplas réplicas durante sua execução.

Arquitetura com shards

Em uma arquitetura shared-nothing, os clusters normalmente são divididos em múltiplos shards, com cada shard contendo um subconjunto do volume total de dados. Uma tabela distribuída fica sobre esses shards, fornecendo uma visão unificada dos dados completos. As leituras podem ser enviadas para a tabela local. A execução da consulta ocorrerá apenas no shard especificado, ou ela pode ser enviada para a tabela distribuída e, nesse caso, cada shard executará a consulta. O servidor no qual a tabela distribuída foi consultada agregará os dados e responderá ao cliente: A figura acima mostra o que acontece quando um cliente consulta uma tabela distribuída:
  1. A consulta SELECT é enviada arbitrariamente para uma tabela distribuída em um nó (por meio de uma estratégia round-robin ou após ser roteada para um servidor específico por um balanceador de carga). Esse nó passará a atuar como coordenador.
  2. O nó localizará cada shard que precisa executar a consulta por meio das informações especificadas pela tabela distribuída, e a consulta será enviada para cada shard.
  3. Cada shard lê, filtra e agrega os dados localmente e, em seguida, envia de volta ao coordenador um estado que pode ser mesclado.
  4. O nó coordenador mescla os dados e depois envia a resposta de volta ao cliente.
Quando adicionamos réplicas à equação, o processo é bastante semelhante, com a única diferença sendo que apenas uma réplica de cada shard executará a consulta. Isso significa que mais consultas podem então ser processadas em paralelo.

Arquitetura sem sharding

O ClickHouse Cloud tem uma arquitetura bem diferente da apresentada acima. (Consulte “Arquitetura do ClickHouse Cloud” para mais detalhes). Com a separação entre compute e storage e uma quantidade virtualmente infinita de armazenamento, a necessidade de shards se torna menos importante. A figura abaixo mostra a arquitetura do ClickHouse Cloud: Essa arquitetura nos permite adicionar e remover réplicas quase instantaneamente, garantindo uma escalabilidade muito alta do cluster. O cluster do ClickHouse Keeper (mostrado à direita) garante que tenhamos uma única fonte de verdade para os metadados. As réplicas podem buscar os metadados no cluster do ClickHouse Keeper e todas mantêm os mesmos dados. Os próprios dados são armazenados em armazenamento de objetos, e o cache em SSD nos permite acelerar as consultas. Mas como podemos agora distribuir a execução de consultas entre vários servidores? Em uma arquitetura com sharding, isso era bastante óbvio, já que cada shard podia de fato executar uma consulta sobre um subconjunto dos dados. Como isso funciona quando não há sharding?

Apresentando réplicas paralelas

Para paralelizar a execução de consultas em vários servidores, primeiro precisamos conseguir designar um dos nossos servidores como coordenador. O coordenador é quem cria a lista de tarefas que precisam ser executadas, garante que todas sejam executadas e agregadas, e que o resultado seja retornado ao cliente. Como na maioria dos sistemas distribuídos, esse será o papel do nó que recebe a consulta inicial. Também precisamos definir a unidade de trabalho. Em uma arquitetura com shards, a unidade de trabalho é o shard, um subconjunto dos dados. Com réplicas paralelas, usaremos uma pequena porção da tabela, chamada grânulos, como unidade de trabalho. Agora, vamos ver como isso funciona na prática com a ajuda da figura abaixo: Com réplicas paralelas:
  1. A consulta do cliente é enviada a um nó depois de passar por um balanceador de carga. Esse nó se torna o coordenador dessa consulta.
  2. O nó analisa o índice de cada parte e seleciona as partes e os grânulos corretos para processamento.
  3. O coordenador divide a carga de trabalho em um conjunto de grânulos que pode ser atribuído a diferentes réplicas.
  4. Cada conjunto de grânulos é processado pelas réplicas correspondentes, e um estado passível de merge é enviado ao coordenador quando elas terminam.
  5. Por fim, o coordenador faz o merge de todos os resultados das réplicas e então retorna a resposta ao cliente.
As etapas acima mostram como réplicas paralelas funcionam em teoria. No entanto, na prática, há muitos fatores que podem impedir que essa lógica funcione perfeitamente:
  1. Algumas réplicas podem estar indisponíveis.
  2. A replicação no ClickHouse é assíncrona; algumas réplicas podem não ter as mesmas partes em um determinado momento.
  3. A latência de cauda entre réplicas precisa ser tratada de alguma forma.
  4. O cache do sistema de arquivos varia de réplica para réplica com base na atividade em cada réplica, o que significa que uma atribuição aleatória de tarefas pode levar a um desempenho menos eficiente, dada a localidade do cache.
Veremos como esses fatores são contornados nas seções a seguir.

Anúncios

Para abordar (1) e (2) da lista acima, introduzimos o conceito de anúncio. Vamos visualizar como isso funciona usando a figura abaixo:
  1. A consulta do cliente é enviada para um nó após passar por um balanceador de carga. Esse nó se torna o coordenador da consulta.
  2. O nó coordenador envia uma solicitação para obter anúncios de todas as réplicas do cluster. As réplicas podem ter visões ligeiramente diferentes do conjunto atual de partes de uma tabela. Por isso, precisamos coletar essas informações para evitar decisões incorretas de escalonamento.
  3. O nó coordenador então usa os anúncios para definir um conjunto de grânulos que podem ser atribuídos às diferentes réplicas. Aqui, por exemplo, podemos ver que nenhum grânulo da parte 3 foi atribuído à réplica 2 porque essa réplica não incluiu essa parte em seu anúncio. Observe também que nenhuma tarefa foi atribuída à réplica 3 porque a réplica não forneceu um anúncio.
  4. Depois que cada réplica tiver processado a consulta em seu subconjunto de grânulos e o estado mesclável tiver sido enviado de volta ao coordenador, o coordenador mescla os resultados e a resposta é enviada ao cliente.

Coordenação dinâmica

Para resolver o problema da latência de cauda, adicionamos a coordenação dinâmica. Isso significa que nem todos os grânulos são enviados para uma réplica em uma única requisição; em vez disso, cada réplica pode solicitar uma nova tarefa (um conjunto de grânulos a serem processados) ao coordenador. O coordenador fornecerá à réplica o conjunto de grânulos com base no anúncio recebido. Vamos supor que estamos na etapa do processo em que todas as réplicas já enviaram um anúncio com todas as partes. A figura abaixo mostra como a coordenação dinâmica funciona:
  1. As réplicas informam ao nó coordenador que conseguem processar tarefas; elas também podem especificar quanto trabalho conseguem processar.
  2. O coordenador atribui tarefas às réplicas.
  1. As réplicas 1 e 2 conseguem concluir sua tarefa muito rapidamente. Elas solicitarão outra tarefa ao nó coordenador.
  2. O coordenador atribui novas tarefas às réplicas 1 e 2.
  1. Todas as réplicas agora concluíram o processamento de sua tarefa. Elas solicitam mais tarefas.
  2. O coordenador, com base nos anúncios, verifica quais tarefas ainda precisam ser processadas, mas não há tarefas restantes.
  3. O coordenador informa às réplicas que tudo foi processado. Agora ele fará o merge de todos os estados mescláveis e responderá à consulta.

Gerenciando a localidade do cache

A última questão potencial que resta é como lidamos com a localidade do cache. Se a consulta for executada várias vezes, como podemos garantir que a mesma tarefa seja encaminhada para a mesma réplica? No exemplo anterior, tínhamos as seguintes tarefas atribuídas:
Réplica 1Réplica 2Réplica 3
Parte 1g1, g6, g7g2, g4, g5g3
Parte 2g1g2, g4, g5g3
Parte 3g1, g6g2, g4, g5g3
Para garantir que as mesmas tarefas sejam atribuídas às mesmas réplicas e possam se beneficiar do cache, duas coisas acontecem. É calculado um hash da parte + conjunto de grânulos (uma tarefa). Em seguida, aplica-se o módulo pelo número de réplicas para a atribuição da tarefa. No papel, isso parece bom, mas, na prática, uma sobrecarga repentina em uma réplica ou uma degradação da rede pode introduzir latência de cauda se a mesma réplica for usada de forma consistente para executar determinadas tarefas. Se max_parallel_replicas for menor que o número de réplicas, então réplicas aleatórias serão escolhidas para a execução da consulta.

Roubo de tarefas

se alguma réplica processar tarefas mais lentamente que as outras, as demais réplicas tentarão ‘roubar’ tarefas que, em princípio, pertencem a essa réplica com base no hash, para reduzir a latência de cauda.

Limitações

Este recurso tem limitações conhecidas, e as principais estão documentadas nesta seção.
Se você encontrar um problema que não seja uma das limitações listadas abaixo e suspeitar que as réplicas paralelas sejam a causa, abra uma issue no GitHub usando o rótulo comp-parallel-replicas.
LimitationDescription
Consultas complexasAtualmente, as réplicas paralelas funcionam muito bem para consultas simples. Camadas de complexidade, como CTEs, subconsultas, junções, consultas não planas etc., podem ter um impacto negativo no desempenho da consulta.
Consultas pequenasSe você estiver executando uma consulta que não processa muitas linhas, executá-la em várias réplicas pode não resultar em melhor desempenho, já que o tempo de rede para a coordenação entre as réplicas pode adicionar ciclos à execução da consulta. Você pode reduzir esses problemas usando a configuração: parallel_replicas_min_number_of_rows_per_replica.
Réplicas paralelas são desabilitadas com FINAL
Projeções não são usadas junto com réplicas paralelas
Dados de alta cardinalidade e agregação complexaAgregações de alta cardinalidade que exigem o envio de muitos dados podem deixar suas consultas significativamente mais lentas.
Compatibilidade com o novo analisadorO novo analisador pode tornar a execução da consulta significativamente mais lenta ou mais rápida em cenários específicos.
ConfiguraçãoDescrição
enable_parallel_replicas0: desabilitado
1: habilitado
2: força o uso de réplica paralela; gerará uma exceção se ela não for usada.
cluster_for_parallel_replicasO nome do cluster a ser usado para réplicas paralelas; se você estiver usando ClickHouse Cloud, use default.
max_parallel_replicasNúmero máximo de réplicas a serem usadas na execução da consulta em várias réplicas; se for especificado um número menor que o número de réplicas no cluster, os nós serão selecionados aleatoriamente. Esse valor também pode sofrer overcommit para acomodar o escalonamento horizontal.
parallel_replicas_min_number_of_rows_per_replicaAjuda a limitar o número de réplicas usadas com base no número de linhas que precisam ser processadas; o número de réplicas usadas é definido por:
estimated rows to read / min_number_of_rows_per_replica.
enable_analyzerA execução de consultas com réplicas paralelas é compatível apenas com o analisador habilitado

Investigando problemas com réplicas paralelas

Você pode verificar quais configurações estão sendo usadas para cada consulta na tabela system.query_log. Você também pode consultar a tabela system.events para ver todos os eventos que ocorreram no servidor e pode usar a função de tabela clusterAllReplicas para ver as tabelas em todas as réplicas (se você for usuário do Cloud, use default).
Query
SELECT
   hostname(),
   *
FROM clusterAllReplicas('default', system.events)
WHERE event ILIKE '%ParallelReplicas%'
Response
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleRequestMicroseconds      │   438 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   558 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadUnassignedMarks            │   240 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadAssignedForStealingMarks   │     4 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingByHashMicroseconds     │     5 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasProcessingPartsMicroseconds    │     5 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     3 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleRequestMicroseconds      │   698 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   644 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadUnassignedMarks            │   190 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadAssignedForStealingMarks   │    54 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingByHashMicroseconds     │     8 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasProcessingPartsMicroseconds    │     4 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasAvailableCount                 │     6 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleRequestMicroseconds      │   620 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   656 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadUnassignedMarks            │     1 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadAssignedForStealingMarks   │     1 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingByHashMicroseconds     │     4 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasProcessingPartsMicroseconds    │     3 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     1 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleRequestMicroseconds      │   696 │ Time spent processing requests for marks from replicas                                               │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleAnnouncementMicroseconds │   717 │ Time spent processing replicas announcements                                                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadUnassignedMarks            │     2 │ Sum across all replicas of how many unassigned marks were scheduled                                  │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadAssignedForStealingMarks   │     2 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingByHashMicroseconds     │    10 │ Time spent collecting segments meant for stealing by hash                                            │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasProcessingPartsMicroseconds    │     6 │ Time spent processing data parts                                                                     │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingLeftoversMicroseconds  │     2 │ Time spent collecting orphaned segments                                                              │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasUsedCount                      │     2 │ Number of replicas used to execute a query with task-based parallel replicas                         │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasAvailableCount                 │    12 │ Number of replicas available to execute a query with task-based parallel replicas                    │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
A tabela system.text_log também contém informações sobre a execução de consultas com réplicas paralelas:
Query
SELECT message
FROM clusterAllReplicas('default', system.text_log)
WHERE query_id = 'ad40c712-d25d-45c4-b1a1-a28ba8d4019c'
ORDER BY event_time_microseconds ASC
Response
┌─message────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ (from 54.218.178.249:59198) SELECT * FROM session_events WHERE type='type2' LIMIT 10 SETTINGS allow_experimental_parallel_reading_from_replicas=2; (stage: Complete)                                                                                       │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage Complete │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events                                                                                                                                                             │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage WithMergeableState to stage Complete │
│ The number of replicas requested (100) is bigger than the real number available in the cluster (6). Will use the latter number to execute the query.                                                                                                       │
│ Initial request from replica 4: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 4 replica

│ Reading state is fully initialized: part all_0_2_1 with ranges [(0, 182)] in replicas [4]; part all_3_3_0 with ranges [(0, 62)] in replicas [4]                                                                                                            │
│ Sent initial requests: 1 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 2: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 2 replica

│ Sent initial requests: 2 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 1 parts: [part all_0_2_1 with ranges [(128, 182)]]. Finish: false; mine_marks=0, stolen_by_hash=54, stolen_rest=0                                                                                                       │
│ Initial request from replica 1: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 1 replica

│ Sent initial requests: 3 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 4, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 4 with 2 parts: [part all_0_2_1 with ranges [(0, 128)], part all_3_3_0 with ranges [(0, 62)]]. Finish: false; mine_marks=0, stolen_by_hash=0, stolen_rest=190                                                                  │
│ Initial request from replica 0: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 0 replica

│ Sent initial requests: 4 Replicas count: 6                                                                                                                                                                                                                 │
│ Initial request from replica 5: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 5 replica

│ Sent initial requests: 5 Replicas count: 6                                                                                                                                                                                                                 │
│ Handling request from replica 2, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 2 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Initial request from replica 3: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 3 replica

│ Sent initial requests: 6 Replicas count: 6                                                                                                                                                                                                                 │
│ Total rows to read: 2000000                                                                                                                                                                                                                                │
│ Handling request from replica 5, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 5 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 0, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 0 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 1, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 1 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ Handling request from replica 3, minimal marks size is 240                                                                                                                                                                                                 │
│ Going to respond to replica 3 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0                                                                                                                                                │
│ (c-crimson-vd-86-server-rdhnsx3-0.c-crimson-vd-86-server-headless.ns-crimson-vd-86.svc.cluster.local:9000) Cancelling query because enough data has been read                                                                                              │
│ Read 81920 rows, 5.16 MiB in 0.013166 sec., 6222087.194288318 rows/sec., 391.63 MiB/sec.                                                                                                                                                                   │
│ Coordination done: Statistics: replica 0 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 1 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 2 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 3 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 4 - {requests: 3 marks: 244 assigned_to_me: 0 stolen_by_hash: 54 stolen_unassigned: 190}; replica 5 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0} │
│ Peak memory usage (for query): 1.81 MiB.                                                                                                                                                                                                                   │
│ Processed in 0.024095586 sec.                                                                                                                                                                                                                              │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Por fim, você também pode usar o EXPLAIN PIPELINE. Ele mostra como o ClickHouse vai executar uma consulta e quais recursos serão usados na execução da consulta. Veja a consulta a seguir como exemplo:
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) LIMIT 10
Vejamos o pipeline da consulta sem réplica paralela:
EXPLAIN PIPELINE (without parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=0 
FORMAT TSV;
E agora, com réplica paralela:
EXPLAIN PIPELINE (with parallel replica)
EXPLAIN PIPELINE graph = 1, compact = 0 
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp) 
FROM session_events 
WHERE type='type3' 
GROUP BY toYear(timestamp) 
LIMIT 10 
SETTINGS allow_experimental_parallel_reading_from_replicas=2 
FORMAT TSV;
Última modificação em 10 de junho de 2026