Pular para o conteúdo principal
Esta página explica como configurar CDC do DynamoDB para o ClickHouse usando ClickPipes. Há 2 componentes nesta integração:
  1. O snapshot inicial via S3 ClickPipes
  2. Atualizações em tempo real via Kinesis ClickPipes
Os dados serão inseridos em uma ReplacingMergeTree. Esse mecanismo de tabela é comumente usado em cenários de CDC para permitir a aplicação de operações de atualização. Saiba mais sobre esse padrão nos seguintes artigos do blog:

1. Configure o stream do Kinesis

Primeiro, habilite um stream do Kinesis na sua tabela do DynamoDB para capturar alterações em tempo real. É importante fazer isso antes de criar o snapshot para evitar a perda de dados. Consulte o guia da AWS aqui.

2. Crie o snapshot

Em seguida, vamos criar um snapshot da tabela do DynamoDB. Isso pode ser feito com uma exportação da AWS para o S3. Consulte o guia da AWS aqui. Faça uma “Exportação completa” no formato DynamoDB JSON.

3. Carregue o snapshot para o ClickHouse

Crie as tabelas necessárias

Os dados do snapshot do DynamoDB terão esta aparência:
{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}
Observe que os dados estão em um formato aninhado. Precisaremos desaninhar esses dados antes de carregá-los no ClickHouse. Isso pode ser feito usando a função JSONExtract no ClickHouse em uma visão materializada. Vamos criar três tabelas:
  1. Uma tabela para armazenar os dados brutos do DynamoDB
  2. Uma tabela para armazenar os dados finais desaninhados (tabela de destino)
  3. Uma visão materializada para desaninhar os dados
Para os dados de exemplo do DynamoDB acima, as tabelas do ClickHouse ficariam assim:
/* Tabela de snapshot */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* Tabela para dados finais desaninhados */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Tabela para dados finais desaninhados */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;
Há alguns requisitos para a tabela de destino:
  • Essa tabela deve ser uma tabela ReplacingMergeTree
  • A tabela deve ter uma coluna version
    • Nas etapas seguintes, faremos o mapeamento do campo ApproximateCreationDateTime do Kinesis stream para a coluna version.
  • A tabela deve usar a chave de partição como chave de ordenação (especificada por ORDER BY)
    • Linhas com a mesma chave de ordenação serão deduplicadas com base na coluna version.

Crie o ClickPipe de snapshot

Agora você pode criar um ClickPipe para carregar os dados do snapshot do S3 para o ClickHouse. Siga o guia do S3 ClickPipe aqui, mas use as seguintes configurações:
  • Caminho de ingestão: Você precisará localizar o caminho dos arquivos JSON exportados no S3. O caminho será mais ou menos assim:
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • Formato: JSONEachRow
  • Tabela: Sua tabela de snapshot (por exemplo, default.snapshot no exemplo acima)
Depois de criada, os dados começarão a ser preenchidos nas tabelas de snapshot e de destino. Você não precisa esperar o carregamento do snapshot terminar antes de passar para a próxima etapa.

4. Criar o ClickPipe do Kinesis

Agora podemos configurar o ClickPipe do Kinesis para capturar alterações em tempo real no stream do Kinesis. Siga o guia do ClickPipe do Kinesis aqui, mas use as seguintes configurações:
  • Stream: o stream do Kinesis usado na etapa 1
  • Table: sua tabela de destino (por exemplo, default.destination no exemplo acima)
  • Flatten object: true
  • Column mappings:
    • ApproximateCreationDateTime: version
    • Mapeie os outros campos para as colunas de destino apropriadas, como mostrado abaixo

5. Limpeza (opcional)

Quando o ClickPipe de snapshot for concluído, você poderá excluir a tabela de snapshot e a visão materializada.
DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";
Última modificação em 10 de junho de 2026