Saltar al contenido principal
Esta página explica cómo configurar CDC de DynamoDB a ClickHouse con ClickPipes. Esta integración consta de 2 componentes:
  1. La instantánea inicial mediante S3 ClickPipes
  2. Las actualizaciones en tiempo real mediante Kinesis ClickPipes
Los datos se ingestarán en un ReplacingMergeTree. Este motor de tabla se usa habitualmente en escenarios de CDC para permitir aplicar operaciones de actualización. Puede encontrar más información sobre este patrón en los siguientes artículos del blog:

1. Configurar un stream de Kinesis

Primero, deberá habilitar un stream de Kinesis en su tabla de DynamoDB para capturar los cambios en tiempo real. Conviene hacerlo antes de crear la instantánea para evitar perder datos. Consulte la guía de AWS disponible aquí.

2. Crear la instantánea

A continuación, crearemos una instantánea de la tabla de DynamoDB. Esto puede hacerse mediante una exportación de AWS a S3. Consulte la guía de AWS aquí. Deberá realizar una “exportación completa” en el formato JSON de DynamoDB.

3. Cargar la instantánea en ClickHouse

Crear las tablas necesarias

Los datos de la instantánea de DynamoDB tendrán este aspecto:
{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}
Observe que los datos están en un formato anidado. Tendremos que aplanarlos antes de cargarlos en ClickHouse. Esto se puede hacer con la función JSONExtract de ClickHouse en una vista materializada. Querremos crear tres tablas:
  1. Una tabla para almacenar los datos sin procesar de DynamoDB
  2. Una tabla para almacenar los datos finales aplanados (tabla de destino)
  3. Una vista materializada para aplanar los datos
Para los datos de ejemplo de DynamoDB mostrados arriba, las tablas de ClickHouse se verían así:
/* Tabla de instantánea */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* Tabla para los datos finales aplanados */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Tabla para los datos finales aplanados */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;
Hay algunos requisitos para la tabla de destino:
  • Esta tabla debe ser de tipo ReplacingMergeTree
  • La tabla debe tener una columna version
    • En pasos posteriores, asignaremos el campo ApproximateCreationDateTime del flujo de Kinesis a la columna version.
  • La tabla debe usar la clave de partición como clave de ordenación (especificada por ORDER BY)
    • Las filas con la misma clave de ordenación se deduplicarán según la columna version.

Crear el ClickPipe de instantánea

Ahora puede crear un ClickPipe para cargar los datos de la instantánea desde S3 en ClickHouse. Siga la guía de S3 ClickPipe aquí, pero use esta configuración:
  • Ruta de ingesta: Deberá localizar la ruta de los archivos JSON exportados en S3. La ruta tendrá un aspecto similar a este:
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • Formato: JSONEachRow
  • Tabla: Su tabla de instantánea (p. ej., default.snapshot en el ejemplo anterior)
Una vez creada, los datos comenzarán a cargarse en las tablas de instantánea y de destino. No es necesario esperar a que termine la carga de la instantánea antes de continuar con el siguiente paso.

4. Crear el ClickPipe de Kinesis

Ahora podemos configurar el ClickPipe de Kinesis para capturar cambios en tiempo real del stream de Kinesis. Sigue la guía del ClickPipe de Kinesis aquí, pero usa esta configuración:
  • Stream: El stream de Kinesis utilizado en el paso 1
  • Table: La tabla de destino (p. ej., default.destination en el ejemplo anterior)
  • Flatten object: true
  • Mapeo de columnas:
    • ApproximateCreationDateTime: version
    • Asigna los demás campos a las columnas de destino correspondientes, como se muestra a continuación

5. Limpieza (opcional)

Una vez que el ClickPipe de instantánea haya finalizado, puedes eliminar la tabla de instantánea y la vista materializada.
DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";
Última modificación el 10 de junio de 2026