clickhouse_clickpipe 리소스를 사용해 생성하고 관리할 수 있습니다. 이 페이지에서는 프로바이더 설정 방법과 지원되는 각 ClickPipe 타입의 구성 예시를 설명합니다.
Provider 설정
ClickPipes 지원은 provider 버전 v3.14.0부터 일반 제공됩니다.
이전 버전을 사용하는 경우 alpha 릴리스가 필요합니다. 자세한 내용은
provider changelog를
확인하세요.
terraform {
required_providers {
clickhouse = {
source = "ClickHouse/clickhouse"
version = ">= 3.14.0"
}
}
}
provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}
리소스 개요
clickhouse_clickpipe 리소스에는 다음과 같은 최상위 인수가 있습니다.
| 인수 | 필수 | 설명 |
|---|---|---|
name | 예 | ClickPipe의 이름입니다. |
service_id | 예 | ClickHouse Cloud 서비스의 ID입니다. |
source | 예 | 소스 구성입니다(ClickPipe마다 소스 블록 1개). |
destination | 예 | 대상 구성입니다. |
scaling | 아니요 | 레플리카 수와 크기입니다. 기본값은 레플리카 1개입니다. |
field_mappings | 아니요 | 소스와 대상 컬럼 간의 사용자 지정 필드 매핑입니다. |
settings | 아니요 | 고급 ClickPipe 설정입니다. |
stopped | 아니요 | ClickPipe를 중지된 상태로 생성하려면 true로 설정합니다. 기본값은 false입니다. |
id 및 state 속성은 읽기 전용이며, 생성 후 ClickHouse Cloud에서 자동으로 채워집니다.
대상
destination 블록은 모든 소스 타입에 공통으로 적용됩니다:
destination {
database = "default" # 대상 데이터베이스. 기본값은 "default"입니다.
table = "my_table" # 대상 테이블 이름. CDC를 제외한 모든 소스에 필수입니다.
managed_table = true # ClickPipes가 테이블을 생성하고 관리합니다. 기본값은 true입니다.
table_definition {
engine {
type = "MergeTree" # MergeTree, ReplacingMergeTree, SummingMergeTree, 또는 Null.
}
sorting_key = ["id", "ts"] # 선택 사항.
partition_by = "toYYYYMM(ts)" # 선택 사항.
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
database만 지정하면 됩니다.
ClickPipe 타입별 예시
Kafka
type 값은 kafka, confluent, msk, azureeventhub, redpanda, warpstream입니다.
resource "clickhouse_clickpipe" "kafka_clickpipe" {
name = "My Kafka ClickPipe"
service_id = var.service_id
scaling {
replicas = 2
replica_cpu_millicores = 250
replica_memory_gb = 1.0
}
source {
kafka {
type = "confluent"
format = "JSONEachRow"
brokers = "broker.example.com:9092"
topics = "my_topic"
consumer_group = "clickpipes-consumer-group"
authentication = "PLAIN"
credentials {
username = "my_user"
password = var.kafka_password
}
offset {
strategy = "from_latest"
}
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
columns {
name = "timestamp"
type = "DateTime"
}
}
}
스키마 레지스트리를 사용하는 Kafka
resource "clickhouse_clickpipe" "kafka_avro_clickpipe" {
name = "My Kafka Avro ClickPipe"
service_id = var.service_id
source {
kafka {
type = "confluent"
format = "AvroConfluent"
brokers = "broker.example.com:9092"
topics = "my_avro_topic"
credentials {
username = "my_user"
password = var.kafka_password
}
schema_registry {
url = "https://schema-registry.example.com"
authentication = "PLAIN"
credentials {
username = "sr_user"
password = var.schema_registry_password
}
}
}
}
destination {
table = "my_avro_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "event"
type = "String"
}
}
}
Amazon Kinesis
resource "clickhouse_clickpipe" "kinesis_clickpipe" {
name = "My Kinesis ClickPipe"
service_id = var.service_id
source {
kinesis {
format = "JSONEachRow"
stream_name = "my-stream"
region = "us-east-1"
iterator_type = "TRIM_HORIZON"
authentication = "IAM_USER"
access_key {
access_key_id = var.aws_access_key_id
secret_key = var.aws_secret_key
}
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
IAM role 기반 Kinesis
resource "clickhouse_clickpipe" "kinesis_iam_role_clickpipe" {
name = "My Kinesis ClickPipe (IAM Role)"
service_id = var.service_id
source {
kinesis {
format = "JSONEachRow"
stream_name = "my-stream"
region = "us-east-1"
iterator_type = "LATEST"
authentication = "IAM_ROLE"
iam_role = "arn:aws:iam::123456789012:role/my-kinesis-role"
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
Amazon S3
resource "clickhouse_clickpipe" "s3_clickpipe" {
name = "My S3 ClickPipe"
service_id = var.service_id
source {
object_storage {
type = "s3"
url = "https://my-bucket.s3.amazonaws.com/data/*.json"
format = "JSONEachRow"
authentication = "IAM_USER"
access_key {
access_key_id = var.aws_access_key_id
secret_key = var.aws_secret_key
}
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
SQS를 사용한 S3 지속적인 수집
resource "clickhouse_clickpipe" "s3_continuous_clickpipe" {
name = "My S3 Continuous ClickPipe"
service_id = var.service_id
source {
object_storage {
type = "s3"
url = "https://my-bucket.s3.amazonaws.com/data/*.json"
format = "JSONEachRow"
is_continuous = true
queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
authentication = "IAM_USER"
access_key {
access_key_id = var.aws_access_key_id
secret_key = var.aws_secret_key
}
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
Google Cloud Storage
service_account_key는 GCP 서비스 계정 JSON 키 파일의 내용을 base64로 인코딩한 값이어야 합니다.
resource "clickhouse_clickpipe" "gcs_clickpipe" {
name = "My GCS ClickPipe"
service_id = var.service_id
source {
object_storage {
type = "gcs"
url = "gs://my-bucket/data/*.json"
format = "JSONEachRow"
authentication = "SERVICE_ACCOUNT"
service_account_key = var.gcs_service_account_key
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
Azure Blob Storage
resource "clickhouse_clickpipe" "abs_clickpipe" {
name = "My Azure Blob ClickPipe"
service_id = var.service_id
source {
object_storage {
type = "azureblobstorage"
azure_container_name = "my-container"
path = "data/*.json"
format = "JSONEachRow"
authentication = "CONNECTION_STRING"
connection_string = var.azure_connection_string
}
}
destination {
table = "my_table"
managed_table = true
table_definition {
engine {
type = "MergeTree"
}
}
columns {
name = "id"
type = "UInt64"
}
columns {
name = "message"
type = "String"
}
}
}
Postgres CDC
resource "clickhouse_clickpipe" "postgres_cdc_clickpipe" {
name = "My Postgres CDC ClickPipe"
service_id = var.service_id
source {
postgres {
host = "postgres.example.com"
port = 5432
database = "mydb"
credentials {
username = "postgres_user"
password = var.postgres_password
}
settings {
replication_mode = "cdc"
# 선택적 설정
sync_interval_seconds = 60
pull_batch_size = 100000
allow_nullable_columns = true
initial_load_parallelism = 4
snapshot_num_rows_per_partition = 100000
snapshot_number_of_parallel_tables = 1
}
table_mappings {
source_schema_name = "public"
source_table = "users"
target_table = "public_users"
}
table_mappings {
source_schema_name = "public"
source_table = "orders"
target_table = "public_orders"
# 선택 사항
excluded_columns = ["internal_notes"]
use_custom_sorting_key = true
sorting_keys = ["id", "created_at"]
table_engine = "ReplacingMergeTree"
}
}
}
destination {
database = "default"
}
}
IAM role을 사용하는 Postgres
resource "clickhouse_clickpipe" "postgres_iam_role_clickpipe" {
name = "My Postgres CDC ClickPipe (IAM Role)"
service_id = var.service_id
source {
postgres {
host = "mydb.cluster.us-east-1.rds.amazonaws.com"
port = 5432
database = "mydb"
type = "rdspostgres"
authentication = "iam_role"
iam_role = "arn:aws:iam::123456789012:role/my-rds-role"
credentials {
username = "postgres_user"
}
settings {
replication_mode = "cdc"
}
table_mappings {
source_schema_name = "public"
source_table = "orders"
target_table = "public_orders"
}
}
}
destination {
database = "default"
}
}
MySQL CDC
resource "clickhouse_clickpipe" "mysql_cdc_clickpipe" {
name = "My MySQL CDC ClickPipe"
service_id = var.service_id
source {
mysql {
host = "mysql.example.com"
port = 3306
type = "mysql"
credentials {
username = "mysql_user"
password = var.mysql_password
}
settings {
replication_mode = "cdc"
# 선택적 설정
sync_interval_seconds = 30
pull_batch_size = 10000
allow_nullable_columns = true
initial_load_parallelism = 4
snapshot_num_rows_per_partition = 100000
snapshot_number_of_parallel_tables = 2
}
table_mappings {
source_schema_name = "mydb"
source_table = "orders"
target_table = "mydb_orders"
}
table_mappings {
source_schema_name = "mydb"
source_table = "customers"
target_table = "mydb_customers"
# 선택 사항
excluded_columns = ["password_hash"]
use_custom_sorting_key = true
sorting_keys = ["id"]
table_engine = "ReplacingMergeTree"
}
}
}
destination {
database = "default"
}
}
MongoDB CDC
resource "clickhouse_clickpipe" "mongodb_cdc_clickpipe" {
name = "My MongoDB CDC ClickPipe"
service_id = var.service_id
source {
mongodb {
uri = "mongodb+srv://cluster0.example.mongodb.net"
read_preference = "secondaryPreferred"
credentials {
username = "mongo_user"
password = var.mongodb_password
}
settings {
replication_mode = "cdc"
# 선택적 설정
sync_interval_seconds = 30
pull_batch_size = 500
snapshot_num_rows_per_partition = 100000
snapshot_number_of_parallel_tables = 2
}
table_mappings {
source_database_name = "mydb"
source_collection = "users"
target_table = "mydb_users"
}
table_mappings {
source_database_name = "mydb"
source_collection = "orders"
target_table = "mydb_orders"
table_engine = "ReplacingMergeTree"
}
}
}
destination {
database = "default"
}
}
BigQuery
service_account_file은 GCP 서비스 계정 JSON 키 파일의 base64 인코딩된 내용이어야 합니다.
resource "clickhouse_clickpipe" "bigquery_snapshot_clickpipe" {
name = "My BigQuery ClickPipe"
service_id = var.service_id
source {
bigquery {
snapshot_staging_path = "gs://my-staging-bucket/staging/"
credentials {
service_account_file = var.gcp_service_account_key
}
settings {
replication_mode = "snapshot"
# 선택적 설정
initial_load_parallelism = 4
snapshot_num_rows_per_partition = 100000
snapshot_number_of_parallel_tables = 2
allow_nullable_columns = true
}
table_mappings {
source_dataset_name = "my_dataset"
source_table = "my_table"
target_table = "my_bigquery_table"
}
table_mappings {
source_dataset_name = "my_dataset"
source_table = "another_table"
target_table = "another_bigquery_table"
table_engine = "ReplacingMergeTree"
use_custom_sorting_key = true
sorting_keys = ["id"]
excluded_columns = ["internal_col"]
}
}
}
destination {
database = "default"
}
}
스케일링
scaling 블록을 지원합니다.
scaling {
replicas = 2 # 기본값: 1. 최대값: 10.
replica_cpu_millicores = 500 # 125에서 2000 사이.
replica_memory_gb = 2.0 # 0.5에서 8.0 사이.
}
기존 ClickPipes 가져오기
terraform import clickhouse_clickpipe.example <service_id>:<clickpipe_id>