跳转到主要内容
本页介绍如何使用 ClickPipes 配置从 DynamoDB 到 ClickHouse 的 CDC (变更数据捕获) (变更数据捕获) 。此集成包含 2 个组件:
  1. 通过 S3 ClickPipes 执行初始快照
  2. 通过 Kinesis ClickPipes 进行实时更新
数据将被摄取到 ReplacingMergeTree 中。该表引擎常用于 CDC (变更数据捕获) 场景,以便应用更新操作。有关此模式的更多信息,请参阅以下博客文章:

1. 设置 Kinesis 数据流

首先,你需要为 DynamoDB 表启用 Kinesis 数据流,以实时捕获变更。我们希望在创建快照之前先完成这一步,以避免遗漏任何数据。 可在此处查看 AWS 指南。

2. 创建快照

接下来,我们将创建该 DynamoDB 表的快照。这可以通过 AWS 导出到 S3 来实现。请参阅此处的 AWS 指南。 你需要选择以 DynamoDB JSON 格式进行“完整导出”。

3. 将快照导入 ClickHouse

创建所需的表

来自 DynamoDB 的快照数据大致如下:
{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}
请注意,这些数据采用嵌套格式。在加载到 ClickHouse 之前,需要先将其展平。可以通过在 ClickHouse 的 materialized view 中使用 JSONExtract 函数来实现。 我们需要创建三个表:
  1. 一个用于存储来自 DynamoDB 的原始数据的表
  2. 一个用于存储最终展平后数据的表 (目标表)
  3. 一个用于展平数据的 materialized view
对于上面的 DynamoDB 示例数据,对应的 ClickHouse 表如下所示:
/* 快照表 */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* 存储最终扁平化数据的表 */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* 存储最终扁平化数据的表 */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;
目标端表需要满足以下要求:
  • 该表必须是 ReplacingMergeTree
  • 该表必须包含一个 version
    • 在后续步骤中,我们会将 Kinesis 数据流 中的 ApproximateCreationDateTime 字段映射到 version 列。
  • 该表应将分区键用作排序键 (由 ORDER BY 指定)
    • 具有相同排序键的行会根据 version 列去重。

创建快照 ClickPipe

现在,您可以创建一个 ClickPipe,将快照数据从 S3 加载到 ClickHouse。请参考这里的 S3 ClickPipe 指南,但请使用以下设置:
  • 摄取路径:您需要找到 S3 中导出的 JSON 文件路径。该路径大致如下:
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • 格式: JSONEachRow
  • : 你的快照表 (例如上述示例中的 default.snapshot)
创建后,数据将开始写入快照表和目标表。你无需等待快照加载完成即可继续下一步。

4. 创建 Kinesis ClickPipe

现在我们可以设置 Kinesis ClickPipe,以捕获 Kinesis 数据流中的实时变更。请参考此处的 Kinesis ClickPipe 指南,并使用以下设置:
  • Stream:第 1 步中使用的 Kinesis 数据流
  • Table:你的目标表 (例如上述示例中的 default.destination)
  • Flatten object:true
  • Column mappings
    • ApproximateCreationDateTimeversion
    • 按下图所示,将其他字段映射到相应的目标列

5. 清理 (可选)

快照 ClickPipe 完成后,您可以删除快照表和 materialized view。
DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";
最后修改于 2026年6月10日