Pular para o conteúdo principal
Este guia apresenta padrões comuns para trabalhar com dados JSON replicados do MongoDB para o ClickHouse via ClickPipes. Suponha que tenhamos criado uma coleção t1 no MongoDB para rastrear pedidos de clientes:
db.t1.insertOne({
  "order_id": "ORD-001234",
  "customer_id": 98765,
  "status": "completed",
  "total_amount": 299.97,
  "order_date": new Date(),
  "shipping": {
    "method": "express",
    "city": "Seattle",
    "cost": 19.99
  },
  "items": [
    {
      "category": "electronics",
      "price": 149.99
    },
    {
      "category": "accessories",
      "price": 24.99
    }
  ]
})
O MongoDB CDC Connector replica documentos do MongoDB no ClickHouse usando o tipo de dados JSON nativo. A tabela replicada t1 no ClickHouse conterá a seguinte linha:
Row 1:
──────
_id:                "68a4df4b9fe6c73b541703b0"
doc:                {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
_peerdb_synced_at:  2025-08-19 20:50:42.005000000
_peerdb_is_deleted: 0
_peerdb_version:    0

Esquema da tabela

As tabelas replicadas usam este esquema padrão:
┌─name───────────────┬─type──────────┐
 _id String
 doc JSON
 _peerdb_synced_at DateTime64(9) 
 _peerdb_version Int64
 _peerdb_is_deleted Int8
└────────────────────┴───────────────┘
  • _id: Chave primária do MongoDB
  • doc: Documento do MongoDB replicado como tipo de dado JSON
  • _peerdb_synced_at: Registra quando a linha foi sincronizada pela última vez
  • _peerdb_version: Acompanha a versão da linha; é incrementada quando a linha é atualizada ou excluída
  • _peerdb_is_deleted: Indica se a linha foi excluída

Mecanismo de tabela ReplacingMergeTree

ClickPipes mapeia coleções do MongoDB para o ClickHouse usando a família de mecanismos de tabela ReplacingMergeTree. Com esse mecanismo, as atualizações são modeladas como inserções com uma versão mais recente (_peerdb_version) do documento para uma determinada chave primária (_id), permitindo tratar com eficiência atualizações, substituições e exclusões como inserções versionadas. O ReplacingMergeTree remove duplicatas de forma assíncrona em segundo plano. Para garantir a ausência de duplicatas na mesma linha, use o modificador FINAL. Por exemplo:
SELECT * FROM t1 FINAL;

Tratamento de exclusões

As exclusões do MongoDB são propagadas como novas linhas marcadas como excluídas usando a coluna _peerdb_is_deleted. Em geral, convém filtrá-las nas consultas:
SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0;
Você também pode criar uma política em nível de linha para filtrar automaticamente as linhas excluídas, em vez de especificar esse filtro em cada consulta:
CREATE ROW POLICY policy_name ON t1
FOR SELECT USING _peerdb_is_deleted = 0;

Consultando dados JSON

Você pode consultar diretamente campos do JSON usando a sintaxe de ponto:
Query
SELECT
    doc.order_id,
    doc.shipping.method
FROM t1;
Result
┌-─doc.order_id─┬─doc.shipping.method─┐
 ORD-001234 express
└───────────────┴─────────────────────┘
Ao consultar campos de objetos aninhados com a sintaxe de ponto, certifique-se de adicionar o operador ^:
Query
SELECT doc.^shipping as shipping_info FROM t1;
Result
┌─shipping_info──────────────────────────────────────┐
 {"city":"Seattle","cost":19.99,"method":"express"}
└────────────────────────────────────────────────────┘

Tipo Dynamic

No ClickHouse, cada campo em JSON é do tipo Dynamic. O tipo Dynamic permite que o ClickHouse armazene valores de qualquer tipo sem saber o tipo de antemão. Você pode verificar isso com a função toTypeName:
Query
SELECT toTypeName(doc.customer_id) AS type FROM t1;
Result
┌─type────┐
 Dynamic
└─────────┘
Para verificar os tipos de dados subjacentes de um campo, você pode usar a função dynamicType. Observe que é possível que o mesmo nome de campo tenha tipos de dados diferentes em linhas diferentes:
Query
SELECT dynamicType(doc.customer_id) AS type FROM t1;
Result
┌─type──┐
 Int64
└───────┘
Funções regulares funcionam com o tipo Dynamic da mesma forma que funcionam com colunas regulares: Exemplo 1: Parsing de datas
Query
SELECT parseDateTimeBestEffortOrNull(doc.order_date) AS order_date FROM t1;
Result
┌─order_date──────────┐
 2025-08-19 20:32:11
└─────────────────────┘
Exemplo 2: Lógica condicional
Query
SELECT multiIf(
    doc.total_amount < 100, 'less_than_100',
    doc.total_amount < 1000, 'less_than_1000',
    '1000+') AS spendings
FROM t1;
Result
┌─spendings──────┐
 less_than_1000
└────────────────┘
Exemplo 3: operações com Array
Query
SELECT length(doc.items) AS item_count FROM t1;
Result
┌─item_count─┐
          2
└────────────┘

Conversão de tipo de campos

As funções de agregação no ClickHouse não funcionam diretamente com o tipo Dynamic. Por exemplo, se você tentar usar a função sum diretamente em um tipo Dynamic, receberá o seguinte erro:
SELECT sum(doc.shipping.cost) AS shipping_cost FROM t1;
-- DB::Exception: Illegal type Dynamic of argument for aggregate function sum. (ILLEGAL_TYPE_OF_ARGUMENT)
Para usar funções de agregação, converta o campo para o tipo adequado usando a função CAST ou a sintaxe :::
Query
SELECT sum(doc.shipping.cost::Float32) AS shipping_cost FROM t1;
Result
┌─shipping_cost─┐
         19.99
└───────────────┘
A conversão do tipo Dynamic para o tipo de dado subjacente (determinado por dynamicType) é muito eficiente, pois o ClickHouse já armazena internamente o valor nesse tipo subjacente.

Achatamento de JSON

VIEW normal

Você pode criar VIEWs normais sobre a tabela JSON para encapsular a lógica de achatamento, conversão de tipos e transformação, de modo a consultar os dados como se estivessem em uma tabela relacional. As VIEWs normais são leves porque armazenam apenas a própria consulta, e não os dados subjacentes. Por exemplo:
CREATE VIEW v1 AS
SELECT
    CAST(doc._id, 'String') AS object_id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
Esta VIEW terá o seguinte esquema:
┌─name────────────┬─type───────────┐
 object_id String
 order_id String
 customer_id Int64
 status String
 total_amount Decimal(18, 2) 
 order_date DateTime64(3)  
 shipping_info JSON
 items Dynamic
└─────────────────┴────────────────┘
Agora você pode consultar a VIEW de maneira semelhante a uma tabela achatada:
SELECT
    customer_id,
    sum(total_amount)
FROM v1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;

VIEW materializada atualizável

Você pode criar VIEWs materializadas atualizáveis, que permitem agendar a execução de consultas para desduplicar linhas e armazenar os resultados em uma tabela de destino desnormalizada. A cada atualização agendada, a tabela de destino é substituída pelos resultados mais recentes da consulta. A principal vantagem desse método é que a consulta com a palavra-chave FINAL é executada apenas uma vez durante a atualização, eliminando a necessidade de usar FINAL nas consultas subsequentes à tabela de destino. Uma desvantagem é que os dados na tabela de destino ficam atualizados apenas até a atualização mais recente. Para muitos casos de uso, intervalos de atualização que vão de vários minutos a algumas horas oferecem um bom equilíbrio entre atualidade dos dados e desempenho das consultas.
CREATE TABLE flattened_t1 (
    `_id` String,
    `order_id` String,
    `customer_id` Int64,
    `status` String,
    `total_amount` Decimal(18, 2),
    `order_date` DateTime64(3),
    `shipping_info` JSON,
    `items` Dynamic
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW rmv REFRESH EVERY 1 HOUR TO flattened_t1 AS
SELECT 
    CAST(doc._id, 'String') AS _id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
Agora você pode consultar diretamente a tabela flattened_t1 sem o modificador FINAL:
SELECT
    customer_id,
    sum(total_amount)
FROM flattened_t1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;

VIEW materializada incremental

Se você quiser acessar colunas achatadas em tempo real, pode criar VIEWs materializadas incrementais. Se a sua tabela tiver atualizações frequentes, não é recomendável usar o modificador FINAL na sua VIEW materializada, pois cada atualização acionará um merge. Em vez disso, você pode deduplicar os dados em tempo de consulta criando uma VIEW normal sobre a VIEW materializada.
CREATE TABLE flattened_t1 (
    `_id` String,
    `order_id` String,
    `customer_id` Int64,
    `status` String,
    `total_amount` Decimal(18, 2),
    `order_date` DateTime64(3),
    `shipping_info` JSON,
    `items` Dynamic,
    `_peerdb_version` Int64,
    `_peerdb_synced_at` DateTime64(9),
    `_peerdb_is_deleted` Int8
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW imv TO flattened_t1 AS
SELECT 
    CAST(doc._id, 'String') AS _id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items,
    _peerdb_version,
    _peerdb_synced_at,   
    _peerdb_is_deleted
FROM t1;

CREATE VIEW flattened_t1_final AS
SELECT * FROM flattened_t1 FINAL WHERE _peerdb_is_deleted = 0;
Agora você pode consultar a VIEW flattened_t1_final da seguinte forma:
SELECT
    customer_id,
    sum(total_amount)
FROM flattened_t1_final
AND shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
Última modificação em 10 de junho de 2026