跳转到主要内容
默认情况下,S3 ClickPipe 会假定文件按词典序添加到存储桶中。也可以通过设置一个与存储桶关联的 Amazon SQS 队列 (可选使用 Amazon EventBridge 作为事件路由) ,将 S3 ClickPipe 配置为摄取没有固有顺序的文件。这样,ClickPipes 就可以监听 ObjectCreated:* 事件,并摄取任何新文件,而不受文件命名约定的影响。
无序模式支持 Amazon S3,支持公共存储桶或兼容 S3 的服务。它要求设置一个与存储桶关联的 Amazon SQS 队列,并可选使用 Amazon EventBridge 作为事件路由。

工作原理

在此模式下,S3 ClickPipe 会先对所选路径中的所有文件执行初始加载,随后监听队列中与指定路径匹配的 ObjectCreated:* 事件。对于已处理过的文件、不匹配该路径的文件,或其他类型事件对应的任何消息,都会被忽略。当达到 max insert bytesmax file count 中配置的阈值时,或经过一个可配置的时间间隔后 (默认 30 秒) ,文件就会被摄取。无法从某个特定文件或时间点开始摄取——ClickPipes 始终会加载所选路径中的所有文件。 摄取数据时可能会发生各种故障,从而导致部分插入或数据重复。对象存储 ClickPipes 对插入失败具备容错能力,并通过临时暂存表提供精确一次语义。数据会先插入暂存表;如果出现问题,暂存表会被清空,并在干净状态下重试插入。只有在插入成功完成后,分区才会被移动到目标表。
1

创建亚马逊 SQS 队列

1. 在 AWS Console 中,导航至 Simple Queue Service > Create queue。使用默认设置创建一个新的标准队列。
我们强烈建议为 SQS 队列配置 死信队列 (DLQ) ,以便更轻松地调试并重试失败的消息。如果配置了 DLQ,失败的消息会被重新放回队列并再次处理,最多重试次数由 DLQ maxReceiveCount 参数中配置的值决定。
2. 使用以下两种方式之一将您的 S3 存储桶连接到 SQS 队列。大多数场景下推荐使用 EventBridge,因为它支持扇出 (fan-out) 、更灵活的事件过滤,且不受 S3 每种事件类型、每个前缀只能配置一条通知规则的限制。
a. 在 S3 存储桶属性中,转到 Event notifications > Amazon EventBridge,启用向 EventBridge 发送通知,然后点击 Save changesb. 在 AWS Console 中,转到 Amazon EventBridge > Rules > Create rule。为规则命名 (例如 S3ObjectCreated) ,选择 default 事件总线,然后点击 Next。在 Build event pattern 步骤中,选择 AWS events or EventBridge partner events 作为事件源,然后手动输入以下事件模式,并将 <bucket-name> 替换为你的存储桶名称:
{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["<bucket-name>"]
    }
  }
}
你也可以在模式中添加 object.key 条件,按前缀或后缀进行过滤。如果这样做,请确保它与为 ClickPipe 设置的路径一致。c.Select target(s) 步骤中,选择 AWS service 作为目标类型,并选择 SQS queue。选取上一步创建的队列。保持 Use execution role (recommended) 为选中状态,让 EventBridge 自动创建所需的 IAM 角色,然后点击 Next 并完成向导。d. 编辑 SQS 队列访问策略,允许 EventBridge 向其发送消息。将 <sqs-queue-arn><eventbridge-rule-arn> 替换为相应的值:
{
  "Version": "2012-10-17",
  "Id": "example-ID",
  "Statement": [
    {
      "Sid": "AllowEventBridgeToSendMessage",
      "Effect": "Allow",
      "Principal": {
        "Service": "events.amazonaws.com"
      },
      "Action": "SQS:SendMessage",
      "Resource": "<sqs-queue-arn>",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "<eventbridge-rule-arn>"
        }
      }
    }
  ]
}
2

配置 IAM role

1. 在 ClickHouse Cloud 控制台中,前往 Settings > Network security 信息,复制你的服务的 IAM role ARN2. 在 AWS Console 中,前往 IAM > Roles > Create role。选择 Custom trust policy,粘贴以下内容,并将 <ch-cloud-arn> 替换为上一步复制的 IAM role ARN:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowAssumeRole",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<ch-cloud-arn>"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
3. 为 IAM 角色创建一条内联策略,授予其从 S3 读取对象以及管理 SQS 队列消息所需的权限。请将 <bucket-arn><sqs-queue-arn> 替换为相应的值:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "S3BucketMetadataAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketLocation",
        "s3:ListBucket"
      ],
      "Resource": "<bucket-arn>"
    },
    {
      "Sid": "AllowGetListObjects",
      "Effect": "Allow",
      "Action": [
        "s3:Get*",
        "s3:List*"
      ],
      "Resource": "<bucket-arn>/*"
    },
    {
      "Sid": "SQSNotificationsAccess",
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ListQueues",
        "sqs:ReceiveMessage",
        "sqs:GetQueueAttributes"
      ],
      "Resource": "<sqs-queue-arn>"
    }
  ]
}
3

使用无序模式创建 ClickPipe

1. 在 ClickHouse Cloud 控制台中,前往 数据源 > 创建 ClickPipe,然后选择 亚马逊 S3。填写连接 S3 存储桶所需的信息。在 身份验证方法 下,选择 IAM role,并填写你在上一步创建的角色 ARN。2.传入数据 下,启用 持续摄取。选择 任意顺序 作为摄取模式,并填写与你的存储桶关联的队列的 SQS queue URL3.解析信息 下,为目标表定义 排序键。按需调整映射后的 schema,然后为 ClickPipes 数据库用户配置角色。4. 检查配置并点击 创建 ClickPipe。ClickPipes 会先扫描你的存储桶,加载所有与指定路径匹配的现有文件,随后在队列中收到新的 ObjectCreated:* 事件时开始处理文件。
最后修改于 2026年6月10日