Pular para o conteúdo principal
Este motor permite integrar o ClickHouse ao NATS. NATS permite:
  • Publicar em ou assinar subjects de mensagens.
  • Processar novas mensagens à medida que se tornem disponíveis.

Criando uma tabela

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = NATS SETTINGS
    nats_url = 'host:port',
    nats_subjects = 'subject1,subject2,...',
    nats_format = 'data_format'[,]
    [nats_schema = '',]
    [nats_num_consumers = N,]
    [nats_queue_group = 'group_name',]
    [nats_secure = false,]
    [nats_max_reconnect = N,]
    [nats_reconnect_wait = N,]
    [nats_server_list = 'host1:port1,host2:port2,...',]
    [nats_skip_broken_messages = N,]
    [nats_max_block_size = N,]
    [nats_flush_interval_ms = N,]
    [nats_username = 'user',]
    [nats_password = 'password',]
    [nats_token = 'clickhouse',]
    [nats_credential_file = '/var/nats_credentials',]
    [nats_startup_connect_tries = '5']
    [nats_max_rows_per_message = 1,]
    [nats_handle_error_mode = 'default']
Parâmetros obrigatórios:
  • nats_url – host:port (por exemplo, localhost:5672).
  • nats_subjects – Lista de subjects da tabela NATS para assinar/publicar. Suporta subjects curinga como foo.*.bar ou baz.>
  • nats_format – Formato da mensagem. Usa a mesma notação da função SQL FORMAT, como JSONEachRow. Para mais informações, consulte a seção Formatos.
Parâmetros opcionais:
  • nats_schema – Parâmetro que deve ser usado se o formato exigir uma definição de schema. Por exemplo, Cap’n Proto exige o caminho para o arquivo de schema e o nome do objeto raiz schema.capnp:Message.
  • nats_stream – O nome de um stream existente no NATS JetStream.
  • nats_consumer – O nome de um pull consumer durável existente no NATS JetStream.
  • nats_num_consumers – O número de consumers por tabela. Padrão: 1. Especifique mais consumers se a taxa de transferência de um consumer for insuficiente, somente para NATS Core.
  • nats_queue_group – Nome do queue group dos assinantes NATS. O padrão é o nome da tabela.
  • nats_max_reconnect – Obsoleto e sem efeito; a reconexão é realizada permanentemente com o timeout nats_reconnect_wait.
  • nats_reconnect_wait – Tempo de espera, em milissegundos, entre cada tentativa de reconexão. Padrão: 5000.
  • nats_server_list - Lista de servidores para conexão. Pode ser especificada para conectar a um cluster NATS.
  • nats_skip_broken_messages - Tolerância do parser de mensagens NATS a mensagens incompatíveis com o schema por bloco. Padrão: 0. Se nats_skip_broken_messages = N, o engine ignora N mensagens NATS que não podem ser analisadas (uma mensagem equivale a uma linha de dados).
  • nats_max_block_size - Número de linhas coletadas por poll(s) para descarregar dados do NATS. Padrão: max_insert_block_size.
  • nats_flush_interval_ms - Timeout para descarregar os dados lidos do NATS. Padrão: stream_flush_interval_ms.
  • nats_username - Nome de usuário do NATS.
  • nats_password - Senha do NATS.
  • nats_token - Token de autenticação do NATS.
  • nats_credential_file - Caminho para um arquivo de credentials do NATS.
  • nats_startup_connect_tries - Número de tentativas de conexão na inicialização. Padrão: 5.
  • nats_max_rows_per_message — O número máximo de linhas gravadas em uma mensagem NATS para formatos baseados em linha. (padrão: 1).
  • nats_handle_error_mode — Como lidar com erros no engine NATS. Valores possíveis: default (a exceção será lançada se não conseguirmos analisar uma mensagem), stream (a mensagem de exceção e a mensagem bruta serão salvas nas colunas virtuais _error e _raw_message).
Conexão SSL: Para uma conexão segura, use nats_secure = 1. A verificação do certificado é controlada pela variável de ambiente CLICKHOUSE_NATS_TLS_SECURE; Se o certificado estiver expirado, for autoassinado, estiver ausente ou inválido por qualquer outro motivo, desative a verificação definindo CLICKHOUSE_NATS_TLS_SECURE=0. Gravação na tabela NATS: Se a tabela lê apenas de um subject, qualquer inserção será publicada nesse mesmo subject. No entanto, se a tabela lê de vários subjects, precisamos especificar em qual subject queremos publicar. É por isso que, sempre que for inserir em uma tabela com vários subjects, é necessário definir stream_like_engine_insert_queue. Você pode selecionar um dos subjects dos quais a tabela lê e publicar seus dados nele. Por exemplo:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1,subject2',
             nats_format = 'JSONEachRow';

  INSERT INTO queue
  SETTINGS stream_like_engine_insert_queue = 'subject2'
  VALUES (1, 1);
Também é possível adicionar configurações de formato junto com as configurações relacionadas ao NATS. Exemplo:
  CREATE TABLE queue (
    key UInt64,
    value UInt64,
    date DateTime
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';
A configuração do servidor NATS pode ser adicionada usando o arquivo de configuração do ClickHouse. Mais especificamente, você pode adicionar sua senha para o engine NATS:
<nats>
    <user>click</user>
    <password>house</password>
    <token>clickhouse</token>
</nats>

Descrição

SELECT não é particularmente útil para ler mensagens (exceto para depuração), porque cada mensagem pode ser lida apenas uma vez. É mais prático criar fluxos em tempo real usando visões materializadas. Para fazer isso:
  1. Use o engine para criar um consumidor do NATS e tratá-lo como um fluxo de dados.
  2. Crie uma tabela com a estrutura desejada.
  3. Crie uma visão materializada que converta os dados do engine e os insira em uma tabela criada anteriormente.
Quando a MATERIALIZED VIEW é vinculada ao engine, ela começa a coletar dados em segundo plano. Isso permite que você receba continuamente mensagens do NATS e as converta para o formato necessário usando SELECT. Uma tabela do NATS pode ter quantas visões materializadas você quiser; elas não leem dados diretamente da tabela, mas recebem novos registros (em blocos). Assim, você pode gravar em várias tabelas com diferentes níveis de detalhamento (com agrupamento - agregação e sem). Exemplo:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree() ORDER BY key;

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;
Para parar de receber dados dos streams ou alterar a lógica de conversão, desanexe a visão materializada:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
Se quiser alterar a tabela de destino com ALTER, recomendamos desabilitar a visão materializada para evitar discrepâncias entre a tabela de destino e os dados da view.

Colunas virtuais

  • _subject - subject da mensagem no NATS. Tipo de dado: String.
Colunas virtuais adicionais quando nats_handle_error_mode='stream':
  • _raw_message - Mensagem bruta que não pôde ser processada com sucesso. Tipo de dado: Nullable(String).
  • _error - Mensagem de exceção ocorrida durante uma falha no processamento. Tipo de dado: Nullable(String).
Observação: as colunas virtuais _raw_message e _error são preenchidas apenas em caso de exceção durante o processamento; elas são sempre NULL quando a mensagem é processada com sucesso.

Suporte a formatos de dados

O engine NATS oferece suporte a todos os formatos compatíveis com o ClickHouse. O número de linhas em uma mensagem NATS depende de o formato ser baseado em linhas ou em blocos:
  • Para formatos baseados em linhas, o número de linhas em uma mensagem NATS pode ser controlado pela configuração nats_max_rows_per_message.
  • Para formatos baseados em blocos, não é possível dividir um bloco em partes menores, mas o número de linhas em um bloco pode ser controlado pela configuração geral max_block_size.

Usando o JetStream

Antes de usar o engine NATS com o NATS JetStream, você deve criar um stream NATS e um consumer pull durável. Para isso, você pode usar, por exemplo, o utilitário nats do pacote NATS CLI:
$ nats stream add
? Stream Name stream_name
? Subjects stream_subject
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream stream_name was created

Information for Stream stream_name created 2025-10-03 14:12:51

                Subjects: stream_subject
                Replicas: 1
                 Storage: File

Options:

               Retention: Limits
         Acknowledgments: true
          Discard Policy: Old
        Duplicate Window: 2m0s
              Direct Get: true
       Allows Msg Delete: true
            Allows Purge: true
  Allows Per-Message TTL: false
          Allows Rollups: false

Limits:

        Maximum Messages: unlimited
     Maximum Per Subject: unlimited
           Maximum Bytes: unlimited
             Maximum Age: unlimited
    Maximum Message Size: unlimited
       Maximum Consumers: unlimited

State:

                Messages: 0
                   Bytes: 0 B
          First Sequence: 0
           Last Sequence: 0
        Active Consumers: 0
$ nats consumer add
? Select a Stream stream_name
? Consumer name consumer_name
? Delivery target (empty for Pull Consumers) 
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgment policy explicit
? Replay policy instant
? Filter Stream by subjects (blank for all) 
? Maximum Allowed Deliveries -1
? Maximum Acknowledgments Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
Information for Consumer stream_name > consumer_name created 2025-10-03T14:13:51+03:00

Configuration:

                    Name: consumer_name
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512
Após criar o stream e o consumer pull durável, podemos criar uma tabela com o engine NATS. Para isso, você precisa definir: nats_stream, nats_consumer_name e nats_subjects:
CREATE TABLE nats_jet_stream (
    key UInt64,
    value UInt64
  ) ENGINE NATS 
    SETTINGS  nats_url = 'localhost:4222',
              nats_stream = 'stream_name',
              nats_consumer_name = 'consumer_name',
              nats_subjects = 'stream_subject',
              nats_format = 'JSONEachRow';
Última modificação em 10 de junho de 2026