메인 콘텐츠로 건너뛰기
Apache Beam은 개발자가 배치 및 스트림(연속) 데이터 처리 파이프라인을 모두 정의하고 실행할 수 있도록 지원하는 오픈 소스 통합 프로그래밍 모델입니다. Apache Beam의 유연성은 ETL(Extract, Transform, Load) 작업부터 복잡한 이벤트 처리와 실시간 분석에 이르기까지, 다양한 데이터 처리 시나리오를 지원할 수 있다는 점에 있습니다. 이 통합은 기본 삽입 계층으로 ClickHouse의 공식 JDBC 커넥터를 활용합니다.

통합 패키지

Apache Beam과 ClickHouse를 통합하는 데 필요한 패키지는 널리 사용되는 여러 데이터 저장 시스템 및 데이터베이스용 통합 번들인 Apache Beam I/O Connectors에서 유지 관리 및 개발됩니다. org.apache.beam.sdk.io.clickhouse.ClickHouseIO 구현은 Apache Beam repo에 있습니다.

Apache Beam용 ClickHouse 패키지 설정

패키지 설치

패키지 관리 프레임워크에 다음 종속 항목을 추가하십시오:
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-clickhouse</artifactId>
    <version>${beam.version}</version>
</dependency>
권장 Beam 버전ClickHouseIO 커넥터는 Apache Beam 2.59.0 버전부터 사용하는 것을 권장합니다. 그 이전 버전에서는 커넥터의 기능이 완전히 지원되지 않을 수 있습니다.
아티팩트는 공식 Maven 리포지토리에서 확인할 수 있습니다.

코드 예시

다음 예시는 input.csv라는 이름의 CSV 파일을 PCollection으로 읽고, 이를 행(Row) 객체로 변환한 다음(정의된 스키마 사용), ClickHouseIO를 사용하여 로컬 ClickHouse 인스턴스에 삽입합니다:

package org.example;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.clickhouse.ClickHouseIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;

public class Main {

    public static void main(String[] args) {
        // Pipeline 객체를 생성합니다.
        Pipeline p = Pipeline.create();

        Schema SCHEMA =
                Schema.builder()
                        .addField(Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true))
                        .addField(Schema.Field.of("age", Schema.FieldType.INT16).withNullable(true))
                        .addField(Schema.Field.of("insertion_time", Schema.FieldType.DATETIME).withNullable(false))
                        .build();

        // 파이프라인에 변환을 적용합니다.
        PCollection<String> lines = p.apply("ReadLines", TextIO.read().from("src/main/resources/input.csv"));

        PCollection<Row> rows = lines.apply("ConvertToRow", ParDo.of(new DoFn<String, Row>() {
            @ProcessElement
            public void processElement(@Element String line, OutputReceiver<Row> out) {

                String[] values = line.split(",");
                Row row = Row.withSchema(SCHEMA)
                        .addValues(values[0], Short.parseShort(values[1]), DateTime.now())
                        .build();
                out.output(row);
            }
        })).setRowSchema(SCHEMA);

        rows.apply("Write to ClickHouse",
                        ClickHouseIO.write("jdbc:clickhouse://localhost:8123/default?user=default&password=******", "test_table"));

        // 파이프라인을 실행합니다.
        p.run().waitUntilFinish();
    }
}

지원되는 데이터 타입

ClickHouseApache Beam지원 여부참고 사항
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytes는 고정 길이
바이트 배열을 나타내는 LogicalType으로,
org.apache.beam.sdk.schemas.logicaltypes에 위치합니다
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

ClickHouseIO.Write 매개변수

다음 세터 함수를 사용하여 ClickHouseIO.Write 구성을 조정할 수 있습니다.
매개변수 설정 함수인수 유형기본값설명
withMaxInsertBlockSize(long maxInsertBlockSize)1000000삽입할 행 블록의 최대 크기입니다.
withMaxRetries(int maxRetries)5실패한 삽입에 대한 최대 재시도 횟수입니다.
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)재시도에 대한 최대 누적 백오프 기간입니다.
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)첫 번째 재시도 전에 적용되는 초기 백오프 기간입니다.
withInsertDistributedSync(Boolean sync)truetrue이면 분산 테이블의 삽입 작업을 동기화합니다.
withInsertQuorum(Long quorum)null삽입 작업을 확인하는 데 필요한 레플리카 수입니다.
withInsertDeduplicate(Boolean deduplicate)truetrue이면 삽입 작업에 대해 중복 제거가 활성화됩니다.
withTableSchema(TableSchema schema)null대상 ClickHouse 테이블의 스키마입니다.

제한 사항

커넥터를 사용할 때는 다음 제한 사항을 고려하십시오.
  • 현재는 싱크 작업만 지원합니다. 이 커넥터는 소스 작업을 지원하지 않습니다.
  • ClickHouse는 ReplicatedMergeTree 또는 그 위에 구축된 Distributed 테이블에 삽입할 때 중복 제거를 수행합니다. 복제가 없는 경우 일반 MergeTree에 삽입할 때 삽입이 실패한 후 재시도가 성공하면 중복이 발생할 수 있습니다. 하지만 각 블록은 원자적으로 삽입되며, 블록 크기는 ClickHouseIO.Write.withMaxInsertBlockSize(long)를 사용해 설정할 수 있습니다. 중복 제거는 삽입된 블록의 체크섬을 사용해 수행됩니다. 중복 제거에 대한 자세한 내용은 DeduplicationDeduplicate insertion config를 참조하십시오.
  • 이 커넥터는 DDL SQL 문을 수행하지 않으므로, 대상 테이블은 삽입 전에 미리 존재해야 합니다.
마지막 수정일 2026년 6월 10일