Перейти к основному содержанию
По умолчанию S3 ClickPipe предполагает, что файлы добавляются в бакет в лексикографическом порядке. S3 ClickPipe можно настроить на приём файлов, у которых нет подразумеваемого порядка, указав очередь Amazon SQS, подключённую к бакету, и при необходимости используя Amazon EventBridge в качестве маршрутизатора событий. Это позволяет ClickPipes отслеживать события ObjectCreated:* и принимать любые новые файлы независимо от схемы их именования.
Неупорядоченный режим поддерживается только для Amazon S3 и не поддерживается для публичных бакетов или S3-совместимых сервисов. Для него требуется настроить очередь Amazon SQS, подключённую к бакету, и при необходимости использовать Amazon EventBridge в качестве маршрутизатора событий.

Как это работает

В этом режиме S3 ClickPipe выполняет первоначальную загрузку всех файлов по выбранному пути, а затем отслеживает в очереди события ObjectCreated:*, соответствующие указанному пути. Любое сообщение об уже встречавшемся файле, файле, не соответствующем пути, или событии другого типа будет игнорироваться. Ингестия файлов начинается при достижении порога, заданного в max insert bytes или max file count, либо по истечении настраиваемого интервала (по умолчанию 30 секунд). Невозможно начать ингестию с определённого файла или момента времени — ClickPipes всегда загружает все файлы по выбранному пути. При ингестии данных возможны различные сбои, которые могут приводить к частичной вставке или дублированию данных. ClickPipes для объектного хранилища устойчивы к сбоям вставки и обеспечивают семантику «ровно один раз» с помощью временных staging-таблиц. Сначала данные вставляются в staging-таблицу; если что-то пойдёт не так, staging-таблица очищается, и вставка повторяется из чистого состояния. Только после успешного завершения вставки партиции перемещаются в целевую таблицу.
1

Создание очереди Amazon SQS

1. В консоли AWS перейдите в раздел Simple Queue Service > Create queue. Используйте настройки по умолчанию для создания новой стандартной очереди.
Мы настоятельно рекомендуем настроить Dead-Letter-Queue (DLQ) для очереди SQS, чтобы упростить отладку и повторную обработку сообщений с ошибками. Если DLQ настроена, такие сообщения будут снова помещаться в очередь и обрабатываться повторно до числа попыток, заданного в параметре DLQ maxReceiveCount.
2. Подключите S3 бакет к очереди SQS одним из двух способов, описанных ниже. Для большинства сценариев рекомендуется использовать EventBridge: он поддерживает fan-out, обеспечивает более гибкую фильтрацию событий и не имеет ограничения S3, допускающего только одно правило уведомления на тип события и prefix.
a. В свойствах S3 бакета перейдите в Event notifications > Amazon EventBridge и включите отправку уведомлений в EventBridge. Нажмите Save changes.b. В консоли AWS перейдите в Amazon EventBridge > Rules > Create rule. Укажите имя правила (например, S3ObjectCreated), выберите шину событий default и нажмите Next. На шаге Build event pattern выберите AWS events or EventBridge partner events в качестве источника событий, затем вручную введите следующий шаблон события, заменив <bucket-name> именем вашего бакета:
{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["<bucket-name>"]
    }
  }
}
При необходимости добавьте в шаблон условие object.key, чтобы фильтровать объекты по префиксу или суффиксу. Если добавите его, убедитесь, что оно соответствует пути, заданному для ClickPipe.c. На шаге Select target(s) выберите AWS service в качестве типа цели и укажите SQS queue. Выберите очередь, созданную на предыдущем шаге. Оставьте флажок Use execution role (recommended) установленным, чтобы EventBridge автоматически создал необходимую роль IAM, затем нажмите Next и завершите настройку.d. Измените политику доступа очереди SQS, чтобы разрешить EventBridge отправлять в неё сообщения. Замените <sqs-queue-arn> и <eventbridge-rule-arn> соответствующими значениями:
{
  "Version": "2012-10-17",
  "Id": "example-ID",
  "Statement": [
    {
      "Sid": "AllowEventBridgeToSendMessage",
      "Effect": "Allow",
      "Principal": {
        "Service": "events.amazonaws.com"
      },
      "Action": "SQS:SendMessage",
      "Resource": "<sqs-queue-arn>",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "<eventbridge-rule-arn>"
        }
      }
    }
  ]
}
2

Настройте роль IAM

1. В консоли ClickHouse Cloud перейдите в Settings > Network security information и скопируйте IAM role ARN вашего сервиса.2. В консоли AWS перейдите в IAM > Roles > Create role. Выберите Custom trust policy и вставьте следующий текст, заменив <ch-cloud-arn> на IAM role ARN, скопированный на предыдущем шаге:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowAssumeRole",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<ch-cloud-arn>"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
3. Создайте встроенную политику для роли IAM с необходимыми разрешениями, чтобы читать объекты в S3 и управлять сообщениями в очереди SQS. Замените <bucket-arn> и <sqs-queue-arn> на соответствующие значения:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "S3BucketMetadataAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketLocation",
        "s3:ListBucket"
      ],
      "Resource": "<bucket-arn>"
    },
    {
      "Sid": "AllowGetListObjects",
      "Effect": "Allow",
      "Action": [
        "s3:Get*",
        "s3:List*"
      ],
      "Resource": "<bucket-arn>/*"
    },
    {
      "Sid": "SQSNotificationsAccess",
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ListQueues",
        "sqs:ReceiveMessage",
        "sqs:GetQueueAttributes"
      ],
      "Resource": "<sqs-queue-arn>"
    }
  ]
}
3

Создайте ClickPipe в неупорядоченном режиме

1. В консоли ClickHouse Cloud перейдите в раздел Data Sources > Create ClickPipe и выберите Amazon S3. Введите данные для подключения к вашему S3 бакету. В разделе Authentication method выберите роль IAM и укажите ARN роли, которую вы создали на предыдущем шаге.2. В разделе Incoming data включите Continuous ingestion. В качестве режима ингестии выберите Any order и укажите URL очереди SQS для очереди, связанной с вашим бакетом.3. В разделе Parse information задайте ключ сортировки для целевой таблицы. При необходимости скорректируйте сопоставленную схему, затем настройте роль для пользователя базы данных ClickPipes.4. Проверьте конфигурацию и нажмите Create ClickPipe. ClickPipes выполнит первоначальное сканирование вашего бакета, чтобы загрузить все существующие файлы, соответствующие указанному пути, а затем начнёт обрабатывать файлы по мере поступления в очередь новых событий ObjectCreated:*.
Последнее изменение 10 июня 2026 г.