Pular para o conteúdo principal
Se você usa o ClickHouse Cloud, recomendamos usar ClickPipes. O ClickPipes oferece suporte nativo a conexões de rede privadas, ao escalonamento independente da ingestão e dos recursos do cluster, além de monitoramento abrangente para streaming de dados do Kafka para o ClickHouse.
  • Publique ou inscreva-se em fluxos de dados.
  • Organize o armazenamento tolerante a falhas.
  • Processe streams à medida que estiverem disponíveis.

Criar uma tabela

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [ALIAS expr1],
    name2 [type2] [ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_security_protocol = '',]
    [kafka_sasl_mechanism = '',]
    [kafka_sasl_username = '',]
    [kafka_sasl_password = '',]
    [kafka_autodetect_client_rack = '',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_consumer_reschedule_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_consumer_acquire_timeout_ms = 30000,]
    [kafka_max_rows_per_message = 1,]
    [kafka_compression_codec = '',]
    [kafka_compression_level = -1];
Parâmetros obrigatórios:
  • kafka_broker_list — Uma lista de brokers separada por vírgulas (por exemplo, localhost:9092).
  • kafka_topic_list — Uma lista de tópicos do Kafka.
  • kafka_group_name — Um grupo de consumidores do Kafka. Os offsets de leitura são acompanhados separadamente para cada grupo. Se você não quiser que as mensagens sejam duplicadas no cluster, use o mesmo nome de grupo em todos os lugares.
  • kafka_format — Formato da mensagem. Usa a mesma notação da função SQL FORMAT, como JSONEachRow. Para mais informações, consulte a seção Formatos.
Parâmetros opcionais:
  • kafka_security_protocol - Protocolo usado para se comunicar com os brokers. Valores possíveis: plaintext, ssl, sasl_plaintext, sasl_ssl.
  • kafka_sasl_mechanism - Mecanismo SASL a ser usado na autenticação. Valores possíveis: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER.
  • kafka_sasl_username - nome de usuário SASL para uso com os mecanismos PLAIN e SASL-SCRAM-...
  • kafka_sasl_password - senha SASL para uso com os mecanismos PLAIN e SASL-SCRAM-...
  • kafka_schema — Parâmetro que deve ser usado se o formato exigir uma definição de esquema. Por exemplo, Cap’n Proto exige o caminho para o arquivo de esquema e o nome do objeto raiz schema.capnp:Message.
  • kafka_schema_registry_skip_bytes — O número de bytes a ignorar no início de cada mensagem ao usar schema registry com cabeçalhos de envelope (por exemplo, o AWS Glue Schema Registry, que inclui um envelope de 19 bytes). Intervalo: [0, 255]. Padrão: 0.
  • kafka_num_consumers — O número de consumidores por tabela. Especifique mais consumidores se a taxa de transferência de um consumidor for insuficiente. O número total de consumidores não deve exceder o número de partições no tópico, já que apenas um consumidor pode ser atribuído por partição, e não deve ser maior que o número de núcleos físicos no servidor onde o ClickHouse está implantado. Padrão: 1.
  • kafka_max_block_size — O tamanho máximo do lote (em mensagens) na operação de poll. Padrão: max_insert_block_size.
  • kafka_skip_broken_messages — tolerância do parser de mensagens do Kafka a mensagens incompatíveis com o schema por bloco. Se kafka_skip_broken_messages = N, então o engine ignora N mensagens do Kafka que não podem ser analisadas (uma mensagem equivale a uma linha de dados). Padrão: 0.
  • kafka_commit_every_batch — Faz commit de cada lote consumido e processado, em vez de fazer um único commit após gravar um bloco inteiro. Padrão: 0.
  • kafka_client_id — Identificador do cliente. Em branco por padrão.
  • kafka_poll_timeout_ms — Tempo limite para uma única operação de poll do Kafka. Padrão: stream_poll_timeout_ms.
  • kafka_poll_max_batch_size — Número máximo de mensagens obtidas em uma única operação de poll do Kafka. Padrão: max_block_size.
  • kafka_flush_interval_ms — Tempo limite para fazer o flush dos dados do Kafka. Padrão: stream_flush_interval_ms.
  • kafka_consumer_reschedule_ms — Intervalo de reagendamento quando o processamento de streams do Kafka é interrompido (por exemplo, quando não há mensagens disponíveis para consumo). Essa configuração controla o tempo de espera até o consumidor tentar fazer polling novamente. Não deve exceder kafka_consumers_pool_ttl_ms. Padrão: 500 milissegundos.
  • kafka_thread_per_consumer — Fornece uma thread independente para cada consumidor. Quando habilitado, cada consumidor faz o flush dos dados de forma independente, em paralelo (caso contrário, as linhas de vários consumidores são combinadas para formar um bloco). Padrão: 0.
  • kafka_handle_error_mode — Como lidar com erros no motor Kafka. Valores possíveis: default (uma exceção será gerada se não conseguirmos interpretar uma mensagem), stream (a mensagem da exceção e a mensagem bruta serão salvas nas colunas virtuais _error e _raw_message), dead_letter_queue (os dados relacionados ao erro serão salvos em system.dead_letter_queue).
  • kafka_commit_on_select — Faz commit das mensagens quando uma consulta SELECT é executada. Padrão: false.
  • kafka_consumer_acquire_timeout_ms — Tempo limite, em milissegundos, para obter um consumidor do Kafka durante consultas SELECT diretas em uma tabela Kafka2 (com armazenamento de offsets baseado em Keeper). Quando várias consultas SELECT diretas e concorrentes são executadas na mesma tabela, cada uma precisa esperar até que os consumidores fiquem disponíveis. Esse tempo limite evita deadlocks quando as consultas retêm subconjuntos diferentes de consumidores. Padrão: 30000.
  • kafka_max_rows_per_message — O número máximo de linhas gravadas em uma mensagem Kafka para formatos baseados em linhas. Padrão: 1.
  • kafka_autodetect_client_rack — Configura automaticamente o parâmetro client.rack do librdkafka para dar preferência às réplicas do Kafka mais próximas. Fontes compatíveis: AWS_ZONE_ID para o ID da zona de disponibilidade do AWS IMDSv2, por exemplo euc1-az1; AWS_ZONE_NAME para o nome da zona de disponibilidade do AWS IMDSv2, por exemplo eu-central-1a; GCP_ZONE para a zona do serviço de metadados do GCP, por exemplo europe-central2-a; CLICKHOUSE para usar a detecção interna do ClickHouse, que pode depender de metadados da nuvem ou da configuração; AWS_ZONE_NAME_THEN_GCP_ZONE para tentar AWS_ZONE_NAME e depois GCP_ZONE. Padrão: string vazia, desativado. Dica: ambientes diferentes usam formatos diferentes de zona de disponibilidade. O Amazon MSK normalmente usa IDs de zona, portanto prefira AWS_ZONE_ID. O Confluent Cloud normalmente usa nomes de zona, portanto prefira AWS_ZONE_NAME. Em caso de dúvida, use AWS_ZONE_NAME_THEN_GCP_ZONE ou verifique o valor de broker.rack no seu cluster. Observação: os brokers do Kafka precisam estar configurados com broker.rack e replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector.
  • kafka_compression_codec — codec de compressão usado para produzir mensagens. Suportados: string vazia, none, gzip, snappy, lz4, zstd. No caso de string vazia, o codec de compressão não é definido pela tabela; portanto, serão usados os valores dos arquivos de configuração ou o valor padrão de librdkafka. Padrão: string vazia.
  • kafka_compression_level — Parâmetro do nível de compressão para o algoritmo selecionado por kafka_compression_codec. Valores mais altos resultarão em melhor compressão, ao custo de maior uso de CPU. O intervalo utilizável depende do algoritmo: [0-9] para gzip; [0-12] para lz4; apenas 0 para snappy; [0-12] para zstd; -1 = nível de compressão padrão dependente do codec. Padrão: -1.
  • kafka_map_virtual_columns_on_write — Se ativado, colunas com nomes especiais _key, _timestamp, _headers.name e _headers.value no esquema da tabela são mapeadas para os metadados correspondentes da mensagem Kafka em INSERT e excluídas do payload da mensagem. Consulte Mapeamento de colunas para metadados de mensagens Kafka. Padrão: false.
Exemplos:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue3 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
O motor de tabela Kafka não oferece suporte a colunas com valor padrão. Se você precisar de colunas com valor padrão, poderá adicioná-las na visão materializada (veja abaixo).

Descrição

As mensagens entregues são rastreadas automaticamente, portanto cada mensagem em um grupo é contabilizada apenas uma vez. Se você quiser obter os dados duas vezes, crie uma cópia da tabela com outro nome de grupo. Os grupos são flexíveis e sincronizados no cluster. Por exemplo, se você tiver 10 tópicos e 5 cópias de uma tabela em um cluster, cada cópia receberá 2 tópicos. Se o número de cópias mudar, os tópicos serão redistribuídos automaticamente entre elas. Leia mais sobre isso em http://kafka.apache.org/intro. Recomenda-se que cada tópico Kafka tenha seu próprio grupo de consumidores dedicado, garantindo o pareamento exclusivo entre o tópico e o grupo, especialmente em ambientes em que os tópicos podem ser criados e excluídos dinamicamente (por exemplo, em testes ou staging). SELECT não é particularmente útil para ler mensagens (exceto para depuração), porque cada mensagem só pode ser lida uma vez. É mais prático criar fluxos em tempo real usando visões materializadas. Para fazer isso:
  1. Use o mecanismo para criar um consumidor Kafka e trate-o como um fluxo de dados.
  2. Crie uma tabela com a estrutura desejada.
  3. Crie uma visão materializada que converta os dados do mecanismo e os insira em uma tabela criada anteriormente.
Quando a MATERIALIZED VIEW é associada ao mecanismo, ela começa a coletar dados em segundo plano. Isso permite que você receba continuamente mensagens do Kafka e as converta para o formato necessário usando SELECT. Uma tabela Kafka pode ter quantas visões materializadas você quiser; elas não leem dados da tabela Kafka diretamente, mas recebem novos registros (em blocos). Dessa forma, você pode gravar em várias tabelas com diferentes níveis de detalhamento (com agrupamento - agregação e sem). Exemplo:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() AS total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;
Para melhorar o desempenho, as mensagens recebidas são agrupadas em blocos com o tamanho de max_insert_block_size. Se o bloco não for formado dentro de stream_flush_interval_ms milissegundos, os dados serão gravados na tabela independentemente de o bloco estar completo. Para parar de receber dados do tópico ou alterar a lógica de conversão, desanexe a visão materializada:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
Se você quiser alterar a tabela de destino usando ALTER, recomendamos desativar a visão materializada para evitar discrepâncias entre a tabela de destino e os dados da visão materializada.

Configuração

Assim como o GraphiteMergeTree, o motor Kafka oferece suporte a uma configuração estendida por meio do arquivo de configuração do ClickHouse. Há duas chaves de configuração que você pode usar: global (em <kafka>) e no nível do tópico (em <kafka><kafka_topic>). A configuração global é aplicada primeiro e, em seguida, a configuração no nível do tópico é aplicada (se existir).
  <kafka>
    <!-- Opções de configuração global para todas as tabelas do tipo motor Kafka -->
    <debug>cgrp</debug>
    <statistics_interval_ms>3000</statistics_interval_ms>

    <kafka_topic>
        <name>logs</name>
        <statistics_interval_ms>4000</statistics_interval_ms>
    </kafka_topic>

    <!-- Configurações para o consumer -->
    <consumer>
        <auto_offset_reset>smallest</auto_offset_reset>
        <kafka_topic>
            <name>logs</name>
            <fetch_min_bytes>100000</fetch_min_bytes>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <fetch_min_bytes>50000</fetch_min_bytes>
        </kafka_topic>
    </consumer>

    <!-- Configurações para o producer -->
    <producer>
        <kafka_topic>
            <name>logs</name>
            <retry_backoff_ms>250</retry_backoff_ms>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <retry_backoff_ms>400</retry_backoff_ms>
        </kafka_topic>
    </producer>
  </kafka>
Para ver uma lista das opções de configuração disponíveis, consulte a referência de configuração do librdkafka. Use o sublinhado (_) em vez do ponto na configuração do ClickHouse. Por exemplo, check.crcs=true será <check_crcs>true</check_crcs>.

Suporte a Kerberos

Para lidar com o Kafka com suporte a Kerberos, adicione o elemento filho security_protocol com o valor sasl_plaintext. Isso é suficiente se o ticket de concessão de tickets do Kerberos for obtido e armazenado em cache pelo sistema operacional. O ClickHouse consegue manter credenciais do Kerberos usando um arquivo keytab. Considere os elementos filhos sasl_kerberos_service_name, sasl_kerberos_keytab e sasl_kerberos_principal. Exemplo:
<!-- Kafka com suporte a Kerberos -->
<kafka>
  <security_protocol>SASL_PLAINTEXT</security_protocol>
  <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
  <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>

Colunas virtuais

  • _topic — Tópico do Kafka. Tipo de dado: LowCardinality(String).
  • _key — Chave da mensagem. Tipo de dado: String.
  • _offset — Offset da mensagem. Tipo de dado: UInt64.
  • _timestamp — Timestamp da mensagem. Tipo de dado: Nullable(DateTime).
  • _timestamp_ms — Timestamp da mensagem em milissegundos. Tipo de dado: Nullable(DateTime64(3)).
  • _partition — Partição do tópico do Kafka. Tipo de dado: UInt64.
  • _headers.name — Array das chaves dos cabeçalhos da mensagem. Tipo de dado: Array(String).
  • _headers.value — Array dos valores dos cabeçalhos da mensagem. Tipo de dado: Array(String).
Colunas virtuais adicionais quando kafka_handle_error_mode='stream':
  • _raw_message - Mensagem bruta que não pôde ser analisada com sucesso. Tipo de dado: String.
  • _error - Mensagem de exceção ocorrida durante uma falha na análise. Tipo de dado: String.
Observação: as colunas virtuais _raw_message e _error são preenchidas apenas em caso de exceção durante a análise; elas ficam sempre vazias quando a mensagem é analisada com sucesso.

Mapeamento de colunas para metadados de mensagens do Kafka

Ao produzir mensagens com INSERT INTO, o motor Kafka sempre usa uma coluna chamada _key (do tipo String) como a chave da mensagem do Kafka e uma coluna chamada _timestamp (do tipo DateTime) como o timestamp da mensagem do Kafka — se essas colunas existirem na tabela. Por padrão, essas colunas também aparecem no payload da mensagem junto com as outras colunas. Com kafka_map_virtual_columns_on_write = 1, o comportamento muda:
  • _key (tipo String) — mapeada para a chave da mensagem do Kafka.
  • _timestamp (tipo DateTime) — mapeada para o timestamp da mensagem do Kafka.
  • _headers.name (tipo Array(String)) e _headers.value (tipo Array(String)) — mapeados para os cabeçalhos da mensagem do Kafka. Cada par (_headers.name[i], _headers.value[i]) se torna um cabeçalho do Kafka. Como _headers.name e _headers.value compartilham o prefixo Nested _headers, o ClickHouse exige que ambos os arrays tenham o mesmo tamanho em cada linha.
As colunas com esses nomes são excluídas do payload da mensagem somente se seus tipos corresponderem aos listados acima; caso contrário, elas permanecem no payload, de modo que schemas que por acaso reutilizem esses nomes para dados não relacionados continuem funcionando. Exemplo:
CREATE TABLE kafka_out
(
    event_json String,
    `_key` String,
    `_timestamp` DateTime,
    `_headers.name` Array(String),
    `_headers.value` Array(String)
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'broker:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'events-producer',
    kafka_format = 'JSONEachRow',
    kafka_map_virtual_columns_on_write = 1;

INSERT INTO kafka_out VALUES
    ('{"a":1}', 'session-42', now(), ['source', 'trace_id'], ['api', 'abc-123']);
A mensagem produzida no Kafka tem payload {"event_json":"{\"a\":1}"}, chave session-42, o timestamp atual e dois cabeçalhos source=api e trace_id=abc-123.

Suporte a formatos de dados

O motor Kafka oferece suporte a todos os formatos compatíveis com o ClickHouse. O número de linhas em uma mensagem do Kafka depende de o formato ser baseado em linhas ou em blocos:
  • Para formatos baseados em linhas, o número de linhas em uma mensagem do Kafka pode ser controlado pela configuração kafka_max_rows_per_message.
  • Para formatos baseados em blocos, não é possível dividir um bloco em partes menores, mas o número de linhas em um bloco pode ser controlado pela configuração geral max_block_size.

Engine para armazenar offsets confirmados no ClickHouse Keeper

Se allow_experimental_kafka_offsets_storage_in_keeper estiver habilitada, mais duas configurações poderão ser especificadas para o motor de tabela Kafka:
  • kafka_keeper_path especifica o caminho da tabela no ClickHouse Keeper
  • kafka_replica_name especifica o nome da réplica no ClickHouse Keeper
As duas configurações devem ser especificadas juntas, ou nenhuma delas. Quando ambas são especificadas, um novo engine Kafka experimental passa a ser usado. Esse novo engine não depende do armazenamento dos offsets confirmados no Kafka, mas os armazena no ClickHouse Keeper. Ele ainda tenta fazer commit dos offsets no Kafka, mas só depende desses offsets quando a tabela é criada. Em qualquer outra situação (se a tabela for reiniciada ou recuperada após algum erro), os offsets armazenados no ClickHouse Keeper serão usados para continuar consumindo mensagens. Além do offset confirmado, ele também armazena quantas mensagens foram consumidas no último lote, portanto, se o insert falhar, a mesma quantidade de mensagens será consumida, o que permite a desduplicação, se necessário. Exemplo:
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
  kafka_keeper_path = '/clickhouse/{database}/{uuid}',
  kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;

Limitações conhecidas

Como o novo motor é experimental, ele ainda não está pronto para produção. Existem algumas limitações conhecidas na implementação:
  • Excluir e recriar a tabela rapidamente, ou especificar o mesmo caminho do ClickHouse Keeper para motores diferentes, pode causar problemas. Como prática recomendada, você pode usar {uuid} em kafka_keeper_path para evitar conflitos de caminho.
  • Para garantir leituras repetíveis, as mensagens não podem ser consumidas de várias partições em uma única thread. Por outro lado, os consumidores do Kafka precisam ser consultados regularmente para continuarem ativos. Como resultado dessas duas exigências, decidimos permitir a criação de vários consumidores apenas se kafka_thread_per_consumer estiver ativado; caso contrário, é muito complicado evitar problemas relacionados à consulta regular dos consumidores.
Veja também
Última modificação em 10 de junho de 2026