Pular para o conteúdo principal
Todos os tipos de ClickPipe podem ser criados e gerenciados com o recurso clickhouse_clickpipe no provedor Terraform do ClickHouse. Esta página apresenta a configuração do provedor e exemplos de configuração para cada tipo de ClickPipe compatível.

Configuração do provedor

O suporte ao ClickPipes está em disponibilidade geral a partir da versão v3.14.0 do provedor. Se você estiver usando uma versão anterior, será necessário um lançamento alpha — consulte o changelog do provedor para mais detalhes.
Adicione o provedor ClickHouse à sua configuração do Terraform:
terraform {
  required_providers {
    clickhouse = {
      source  = "ClickHouse/clickhouse"
      version = ">= 3.14.0"
    }
  }
}

provider "clickhouse" {
  organization_id = var.organization_id
  token_key       = var.token_key
  token_secret    = var.token_secret
}
Consulte Gerenciando chaves de API para ver instruções sobre como criar uma chave de API para usar com o provedor.

Visão geral do recurso

O recurso clickhouse_clickpipe tem os seguintes argumentos de nível superior:
ArgumentoObrigatórioDescrição
nameSimNome do ClickPipe.
service_idSimID do serviço ClickHouse Cloud.
sourceSimConfiguração da origem (um bloco source por ClickPipe).
destinationSimConfiguração de destino.
scalingNãoNúmero de réplicas e tamanho. O padrão é 1 réplica.
field_mappingsNãoMapeamentos de campos personalizados entre as colunas de origem e destino.
settingsNãoConfigurações avançadas do ClickPipe.
stoppedNãoDefina como true para criar o ClickPipe no estado interrompido. O padrão é false.
Os atributos id e state são somente leitura e são preenchidos pelo ClickHouse Cloud após a criação.

Destino

O bloco destination é comum a todos os tipos de origem:
destination {
  database      = "default"           # Banco de dados de destino. Padrão: "default".
  table         = "my_table"          # Nome da tabela de destino. Obrigatório para todas as fontes, exceto CDC.
  managed_table = true                # Permite que o ClickPipes crie e gerencie a tabela. Padrão: true.

  table_definition {
    engine {
      type      = "MergeTree"         # MergeTree, ReplacingMergeTree, SummingMergeTree ou Null.
    }
    sorting_key   = ["id", "ts"]      # Opcional.
    partition_by  = "toYYYYMM(ts)"    # Opcional.
  }

  columns {
    name = "id"
    type = "UInt64"
  }

  columns {
    name = "message"
    type = "String"
  }
}
Para fontes CDC (Postgres, MySQL, MongoDB, BigQuery), as tabelas de destino são criadas automaticamente com base no schema de origem — normalmente, só database é necessário.

Exemplos por tipo de ClickPipe

Kafka

Valores aceitos para type: kafka, confluent, msk, azureeventhub, redpanda, warpstream.
resource "clickhouse_clickpipe" "kafka_clickpipe" {
  name       = "My Kafka ClickPipe"
  service_id = var.service_id

  scaling {
    replicas               = 2
    replica_cpu_millicores = 250
    replica_memory_gb      = 1.0
  }

  source {
    kafka {
      type           = "confluent"
      format         = "JSONEachRow"
      brokers        = "broker.example.com:9092"
      topics         = "my_topic"
      consumer_group = "clickpipes-consumer-group"
      authentication = "PLAIN"

      credentials {
        username = "my_user"
        password = var.kafka_password
      }

      offset {
        strategy = "from_latest"
      }
    }
  }

  destination {
    table         = "my_table"
    managed_table = true

    table_definition {
      engine {
        type = "MergeTree"
      }
    }

    columns {
      name = "id"
      type = "UInt64"
    }

    columns {
      name = "message"
      type = "String"
    }

    columns {
      name = "timestamp"
      type = "DateTime"
    }
  }
}

Kafka com schema registry

resource "clickhouse_clickpipe" "kafka_avro_clickpipe" {
  name       = "My Kafka Avro ClickPipe"
  service_id = var.service_id

  source {
    kafka {
      type   = "confluent"
      format = "AvroConfluent"
      brokers = "broker.example.com:9092"
      topics  = "my_avro_topic"

      credentials {
        username = "my_user"
        password = var.kafka_password
      }

      schema_registry {
        url            = "https://schema-registry.example.com"
        authentication = "PLAIN"

        credentials {
          username = "sr_user"
          password = var.schema_registry_password
        }
      }
    }
  }

  destination {
    table         = "my_avro_table"
    managed_table = true

    table_definition {
      engine {
        type = "MergeTree"
      }
    }

    columns {
      name = "id"
      type = "UInt64"
    }

    columns {
      name = "event"
      type = "String"
    }
  }
}

Amazon Kinesis

resource "clickhouse_clickpipe" "kinesis_clickpipe" {
  name       = "My Kinesis ClickPipe"
  service_id = var.service_id

  source {
    kinesis {
      format         = "JSONEachRow"
      stream_name    = "my-stream"
      region         = "us-east-1"
      iterator_type  = "TRIM_HORIZON"
      authentication = "IAM_USER"

      access_key {
        access_key_id = var.aws_access_key_id
        secret_key    = var.aws_secret_key
      }
    }
  }

  destination {
    table         = "my_table"
    managed_table = true

    table_definition {
      engine {
        type = "MergeTree"
      }
    }

    columns {
      name = "id"
      type = "UInt64"
    }

    columns {
      name = "message"
      type = "String"
    }
  }
}

Kinesis com IAM role

A autenticação com IAM role exige um serviço ClickHouse em execução na AWS.
resource "clickhouse_clickpipe" "kinesis_iam_role_clickpipe" {
  name       = "My Kinesis ClickPipe (IAM Role)"
  service_id = var.service_id

  source {
    kinesis {
      format         = "JSONEachRow"
      stream_name    = "my-stream"
      region         = "us-east-1"
      iterator_type  = "LATEST"
      authentication = "IAM_ROLE"
      iam_role       = "arn:aws:iam::123456789012:role/my-kinesis-role"
    }
  }

  destination {
    table         = "my_table"
    managed_table = true

    table_definition {
      engine {
        type = "MergeTree"
      }
    }

    columns {
      name = "id"
      type = "UInt64"
    }

    columns {
      name = "message"
      type = "String"
    }
  }
}

Amazon S3

resource "clickhouse_clickpipe" "s3_clickpipe" {
  name       = "My S3 ClickPipe"
  service_id = var.service_id

  source {
    object_storage {
      type           = "s3"
      url            = "https://my-bucket.s3.amazonaws.com/data/*.json"
      format         = "JSONEachRow"
      authentication = "IAM_USER"

      access_key {
        access_key_id = var.aws_access_key_id
        secret_key    = var.aws_secret_key
      }
    }
  }

  destination {
    table         = "my_table"
    managed_table = true

    table_definition {
      engine {
        type = "MergeTree"
      }
    }

    columns {
      name = "id"
      type = "UInt64"
    }

    columns {
      name = "message"
      type = "String"
    }
  }
}

Ingestão contínua do S3 com SQS

Para fazer a ingestão contínua usando uma fila SQS (modo não ordenado), consulte Configuração do modo não ordenado para obter instruções de configuração.
resource "clickhouse_clickpipe" "s3_continuous_clickpipe" {
  name       = "My S3 Continuous ClickPipe"
  service_id = var.service_id

  source {
    object_storage {
      type           = "s3"
      url            = "https://my-bucket.s3.amazonaws.com/data/*.json"
      format         = "JSONEachRow"
      is_continuous  = true
      queue_url      = "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
      authentication = "IAM_USER"

      access_key {
        access_key_id = var.aws_access_key_id
        secret_key    = var.aws_secret_key
      }
    }
  }

  destination {
    table         = "my_table"
    managed_table = true

    table_definition {
      engine {
        type = "MergeTree"
      }
    }

    columns {
      name = "id"
      type = "UInt64"
    }

    columns {
      name = "message"
      type = "String"
    }
  }
}

Google Cloud Storage

service_account_key deve ser o conteúdo codificado em base64 de um arquivo JSON de chave de conta de serviço do GCP.
resource "clickhouse_clickpipe" "gcs_clickpipe" {
  name       = "My GCS ClickPipe"
  service_id = var.service_id

  source {
    object_storage {
      type                = "gcs"
      url                 = "gs://my-bucket/data/*.json"
      format              = "JSONEachRow"
      authentication      = "SERVICE_ACCOUNT"
      service_account_key = var.gcs_service_account_key
    }
  }

  destination {
    table         = "my_table"
    managed_table = true

    table_definition {
      engine {
        type = "MergeTree"
      }
    }

    columns {
      name = "id"
      type = "UInt64"
    }

    columns {
      name = "message"
      type = "String"
    }
  }
}

Armazenamento de Blobs do Azure

resource "clickhouse_clickpipe" "abs_clickpipe" {
  name       = "My Azure Blob ClickPipe"
  service_id = var.service_id

  source {
    object_storage {
      type                 = "azureblobstorage"
      azure_container_name = "my-container"
      path                 = "data/*.json"
      format               = "JSONEachRow"
      authentication       = "CONNECTION_STRING"
      connection_string    = var.azure_connection_string
    }
  }

  destination {
    table         = "my_table"
    managed_table = true

    table_definition {
      engine {
        type = "MergeTree"
      }
    }

    columns {
      name = "id"
      type = "UInt64"
    }

    columns {
      name = "message"
      type = "String"
    }
  }
}

Postgres CDC

resource "clickhouse_clickpipe" "postgres_cdc_clickpipe" {
  name       = "My Postgres CDC ClickPipe"
  service_id = var.service_id

  source {
    postgres {
      host     = "postgres.example.com"
      port     = 5432
      database = "mydb"

      credentials {
        username = "postgres_user"
        password = var.postgres_password
      }

      settings {
        replication_mode = "cdc"

        # Configurações opcionais
        sync_interval_seconds              = 60
        pull_batch_size                    = 100000
        allow_nullable_columns             = true
        initial_load_parallelism           = 4
        snapshot_num_rows_per_partition    = 100000
        snapshot_number_of_parallel_tables = 1
      }

      table_mappings {
        source_schema_name = "public"
        source_table       = "users"
        target_table       = "public_users"
      }

      table_mappings {
        source_schema_name = "public"
        source_table       = "orders"
        target_table       = "public_orders"

        # Opcional
        excluded_columns       = ["internal_notes"]
        use_custom_sorting_key = true
        sorting_keys           = ["id", "created_at"]
        table_engine           = "ReplacingMergeTree"
      }
    }
  }

  destination {
    database = "default"
  }
}

Postgres com IAM role

A autenticação com IAM role exige um serviço ClickHouse em execução na AWS.
resource "clickhouse_clickpipe" "postgres_iam_role_clickpipe" {
  name       = "My Postgres CDC ClickPipe (IAM Role)"
  service_id = var.service_id

  source {
    postgres {
      host           = "mydb.cluster.us-east-1.rds.amazonaws.com"
      port           = 5432
      database       = "mydb"
      type           = "rdspostgres"
      authentication = "iam_role"
      iam_role       = "arn:aws:iam::123456789012:role/my-rds-role"

      credentials {
        username = "postgres_user"
      }

      settings {
        replication_mode = "cdc"
      }

      table_mappings {
        source_schema_name = "public"
        source_table       = "orders"
        target_table       = "public_orders"
      }
    }
  }

  destination {
    database = "default"
  }
}

MySQL CDC

resource "clickhouse_clickpipe" "mysql_cdc_clickpipe" {
  name       = "My MySQL CDC ClickPipe"
  service_id = var.service_id

  source {
    mysql {
      host = "mysql.example.com"
      port = 3306
      type = "mysql"

      credentials {
        username = "mysql_user"
        password = var.mysql_password
      }

      settings {
        replication_mode = "cdc"

        # Configurações opcionais
        sync_interval_seconds              = 30
        pull_batch_size                    = 10000
        allow_nullable_columns             = true
        initial_load_parallelism           = 4
        snapshot_num_rows_per_partition    = 100000
        snapshot_number_of_parallel_tables = 2
      }

      table_mappings {
        source_schema_name = "mydb"
        source_table       = "orders"
        target_table       = "mydb_orders"
      }

      table_mappings {
        source_schema_name = "mydb"
        source_table       = "customers"
        target_table       = "mydb_customers"

        # Opcional
        excluded_columns       = ["password_hash"]
        use_custom_sorting_key = true
        sorting_keys           = ["id"]
        table_engine           = "ReplacingMergeTree"
      }
    }
  }

  destination {
    database = "default"
  }
}

CDC do MongoDB

resource "clickhouse_clickpipe" "mongodb_cdc_clickpipe" {
  name       = "My MongoDB CDC ClickPipe"
  service_id = var.service_id

  source {
    mongodb {
      uri             = "mongodb+srv://cluster0.example.mongodb.net"
      read_preference = "secondaryPreferred"

      credentials {
        username = "mongo_user"
        password = var.mongodb_password
      }

      settings {
        replication_mode = "cdc"

        # Configurações opcionais
        sync_interval_seconds              = 30
        pull_batch_size                    = 500
        snapshot_num_rows_per_partition    = 100000
        snapshot_number_of_parallel_tables = 2
      }

      table_mappings {
        source_database_name = "mydb"
        source_collection    = "users"
        target_table         = "mydb_users"
      }

      table_mappings {
        source_database_name = "mydb"
        source_collection    = "orders"
        target_table         = "mydb_orders"
        table_engine         = "ReplacingMergeTree"
      }
    }
  }

  destination {
    database = "default"
  }
}

BigQuery

service_account_file deve ser o conteúdo codificado em base64 do arquivo de chave JSON de uma conta de serviço do GCP.
resource "clickhouse_clickpipe" "bigquery_snapshot_clickpipe" {
  name       = "My BigQuery ClickPipe"
  service_id = var.service_id

  source {
    bigquery {
      snapshot_staging_path = "gs://my-staging-bucket/staging/"

      credentials {
        service_account_file = var.gcp_service_account_key
      }

      settings {
        replication_mode = "snapshot"

        # Configurações opcionais
        initial_load_parallelism           = 4
        snapshot_num_rows_per_partition    = 100000
        snapshot_number_of_parallel_tables = 2
        allow_nullable_columns             = true
      }

      table_mappings {
        source_dataset_name = "my_dataset"
        source_table        = "my_table"
        target_table        = "my_bigquery_table"
      }

      table_mappings {
        source_dataset_name    = "my_dataset"
        source_table           = "another_table"
        target_table           = "another_bigquery_table"
        table_engine           = "ReplacingMergeTree"
        use_custom_sorting_key = true
        sorting_keys           = ["id"]
        excluded_columns       = ["internal_col"]
      }
    }
  }

  destination {
    database = "default"
  }
}

Escalonamento

Todos os tipos de ClickPipe oferecem suporte ao bloco scaling para configurar o número de réplicas e a alocação de recursos:
scaling {
  replicas               = 2     # Padrão: 1. Máximo: 10.
  replica_cpu_millicores = 500   # Entre 125 e 2000.
  replica_memory_gb      = 2.0   # Entre 0.5 e 8.0.
}

Importando ClickPipes existentes

ClickPipes existentes podem ser importados para o estado do Terraform usando o ID do serviço e o ID do ClickPipe:
terraform import clickhouse_clickpipe.example <service_id>:<clickpipe_id>
Última modificação em 10 de junho de 2026