메인 콘텐츠로 건너뛰기
이 커넥터는 ClickHouse가 지원하는 공식 Apache Flink Sink Connector입니다. Flink의 AsyncSinkBase와 공식 ClickHouse Java 클라이언트를 기반으로 구축되었습니다. 이 커넥터는 Apache Flink의 DataStream API를 지원합니다. Table API 지원은 향후 릴리스에서 추가될 예정입니다.

요구 사항

  • Java 11 이상(Flink 1.17+) 또는 17 이상(Flink 2.0+)
  • Apache Flink 1.17+
이 커넥터는 Flink 1.17+와 Flink 2.0+를 모두 지원할 수 있도록 두 개의 아티팩트로 나뉘어 있습니다. 사용할 Flink 버전에 맞는 아티팩트를 선택하십시오:
Flink 버전아티팩트ClickHouse Java 클라이언트 버전필수 Java
latestflink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+
이 커넥터는 Flink 1.17.2보다 이전 버전에서는 테스트되지 않았습니다.

설치 및 설정

의존성으로 추가하기

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

바이너리 다운로드

바이너리 JAR의 명명 패턴은 다음과 같습니다:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
여기서: 사용 가능한 모든 JAR 파일은 Maven Central Repository에서 확인할 수 있습니다.

DataStream API 사용

스니펫

원시 CSV 데이터를 ClickHouse에 삽입한다고 가정해 보겠습니다:
public static void main(String[] args) {
    // ClickHouseClient 구성
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // ElementConverter 생성
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // 싱크를 생성하고 `setClickHouseFormat`으로 포맷을 설정
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // 마지막으로 DataStream을 싱크에 연결
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}
추가 예시와 스니펫은 테스트 코드에서 확인할 수 있습니다:

빠른 시작 예시

ClickHouse Sink를 간편하게 시작할 수 있도록 Maven 기반 예시를 제공했습니다: 자세한 지침은 예시 가이드를 참조하십시오.

DataStream API 연결 옵션

ClickHouse 클라이언트 옵션

매개변수설명기본값필수
url정규화된 ClickHouse URL해당 없음
usernameClickHouse 데이터베이스 사용자 이름해당 없음
passwordClickHouse 데이터베이스 비밀번호해당 없음
databaseClickHouse 데이터베이스 이름해당 없음
tableClickHouse 테이블 이름해당 없음
optionsJava 클라이언트 구성 옵션 맵빈 맵아니요
serverSettingsClickHouse 서버 세션 설정 맵빈 맵아니요
enableJsonSupportAsStringJSON 데이터 타입에 대해 JSON 포맷의 String을 받도록 하는 ClickHouse 서버 설정true아니요
optionsserverSettingsMap<String, String> 형태로 클라이언트에 전달해야 합니다. 둘 중 어느 하나에 빈 맵을 지정하면 각각 클라이언트 또는 서버의 기본값이 사용됩니다.
사용 가능한 모든 Java 클라이언트 옵션은 ClientConfigProperties.java이 문서 페이지에 나와 있습니다.사용 가능한 모든 서버 세션 설정은 이 문서 페이지에 나와 있습니다.
예를 들면 다음과 같습니다.
Map<String, String> javaClientOptions = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

Map<String, String> serverSettings = Map.of(
    "insert_deduplicate", "1"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    javaClientOptions,
    serverSettings,
    false // enableJsonSupportAsString
);

싱크 옵션

다음 옵션은 Flink의 AsyncSinkBase에서 직접 제공됩니다:
매개변수설명기본값필수
maxBatchSize단일 배치에서 삽입할 수 있는 레코드의 최대 개수N/A
maxInFlightRequests싱크가 백프레셔를 적용하기 전에 허용되는 진행 중 요청의 최대 개수N/A
maxBufferedRequests백프레셔가 적용되기 전에 싱크에 버퍼링할 수 있는 레코드의 최대 개수N/A
maxBatchSizeInBytes배치가 가질 수 있는 최대 크기(바이트)입니다. 전송되는 모든 배치는 이 크기 이하입니다N/A
maxTimeInBufferMS플러시되기 전까지 레코드가 싱크에 머무를 수 있는 최대 시간N/A
maxRecordSizeInBytes싱크가 허용하는 최대 레코드 크기이며, 이를 초과하는 레코드는 자동으로 거부됩니다N/A

지원되는 데이터 타입

아래 표는 Flink에서 ClickHouse로 데이터를 삽입할 때 데이터 타입 변환을 빠르게 참고할 수 있도록 정리한 것입니다.
Java 유형ClickHouse 유형지원 여부직렬화 방식
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTimeN/A
long/LongTime64N/A
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariantN/A
참고:
  • 날짜 연산을 수행할 때는 ZoneId를 반드시 지정해야 합니다.
  • Decimal 연산을 수행할 때는 precision 및 scale을 반드시 지정해야 합니다.
  • ClickHouse가 Java String을 JSON으로 파싱할 수 있도록 하려면 ClickHouseClientConfig에서 enableJsonSupportAsString을 활성화해야 합니다.
  • 커넥터는 입력 DataStream의 요소를 ClickHouse 페이로드에 매핑하기 위해 ElementConvertor가 필요합니다. 이를 위해 커넥터는 ClickHouseConvertorPOJOConvertor를 제공하며, 위의 DataWriter 직렬화 메서드를 사용해 이 매핑을 구현할 수 있습니다.

지원되는 입력 형식

사용 가능한 ClickHouse 입력 형식 목록은 이 문서 페이지ClickHouseFormat.java에서 확인할 수 있습니다. 커넥터가 DataStream을 ClickHouse 페이로드로 직렬화할 때 사용할 포맷을 지정하려면 setClickHouseFormat 함수를 사용하세요. 예를 들어 다음과 같습니다:
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString,
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS,
        MAX_BUFFERED_REQUESTS,
        MAX_BATCH_SIZE_IN_BYTES,
        MAX_TIME_IN_BUFFER_MS,
        MAX_RECORD_SIZE_IN_BYTES,
        clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
기본적으로 커넥터는 ClickHouseClientConfig에서 setSupportDefault를 각각 true 또는 false로 명시적으로 설정하면 RowBinaryWithDefaults 또는 RowBinary를 사용합니다.

메트릭

이 커넥터는 Flink의 기존 메트릭에 더해 다음과 같은 추가 메트릭을 노출합니다:
메트릭설명유형상태
numBytesSend요청 페이로드에서 ClickHouse로 전송된 총 바이트 수입니다. 참고: 이 메트릭은 네트워크를 통해 전송된 직렬화 데이터의 크기를 측정하므로, 처리 후 스토리지에 실제로 기록된 바이트 수를 반영하는 system.query_log의 ClickHouse written_bytes와 다를 수 있습니다Counter
numRecordSendClickHouse로 전송된 총 레코드 수입니다Counter
numRequestSubmitted전송된 총 요청 수입니다(실제로 수행된 플러시 횟수)Counter
numOfDroppedBatches재시도할 수 없는 실패로 인해 폐기된 총 배치 수입니다Counter
numOfDroppedRecords재시도할 수 없는 실패로 인해 폐기된 총 레코드 수입니다Counter
totalBatchRetries재시도 가능한 실패로 인해 발생한 총 배치 재시도 횟수입니다Counter
writeLatencyHistogram성공한 쓰기의 지연 시간 분포(ms)를 나타내는 히스토그램입니다Histogram
writeFailureLatencyHistogram실패한 쓰기의 지연 시간 분포(ms)를 나타내는 히스토그램입니다Histogram
triggeredByMaxBatchSizeCountermaxBatchSize에 도달해 발생한 총 플러시 횟수입니다Counter
triggeredByMaxBatchSizeInBytesCountermaxBatchSizeInBytes에 도달해 발생한 총 플러시 횟수입니다Counter
triggeredByMaxTimeInBufferMSCountermaxTimeInBufferMS에 도달해 발생한 총 플러시 횟수입니다Counter
actualRecordsPerBatch실제 배치 크기 분포를 나타내는 히스토그램입니다Histogram
actualBytesPerBatch배치당 실제 바이트 분포를 나타내는 히스토그램입니다Histogram

제한 사항

  • 현재 이 싱크는 최소 1회 전달 보장을 제공합니다. 정확히 한 번 처리 의미 체계 지원 작업은 여기에서 추적되고 있습니다.
  • 이 싱크는 아직 처리할 수 없는 레코드를 버퍼링하기 위한 데드 레터 큐(DLQ)를 지원하지 않습니다. 그때까지는 커넥터가 실패한 레코드를 다시 삽입하려고 시도하며, 끝내 성공하지 못하면 해당 레코드를 버립니다. 이 기능은 여기에서 추적되고 있습니다.
  • 이 싱크는 아직 Flink의 Table API 또는 Flink SQL을 통해 생성하는 기능을 지원하지 않습니다. 이 기능은 여기에서 추적되고 있습니다.

ClickHouse 버전 호환성 및 보안

  • 커넥터는 일일 CI 워크플로를 통해 latest와 head를 포함한 여러 최신 ClickHouse 버전을 대상으로 테스트됩니다. 테스트 대상 버전은 새로운 ClickHouse 릴리스가 활성화되면 주기적으로 업데이트됩니다. 커넥터가 매일 어떤 버전을 대상으로 테스트하는지는 여기에서 확인할 수 있습니다.
  • 알려진 보안 취약점과 취약점 보고 방법은 ClickHouse 보안 정책을 참조하십시오.
  • 보안 수정과 새로운 개선 사항을 놓치지 않도록 커넥터를 계속 업그레이드할 것을 권장합니다.
  • 마이그레이션에 문제가 있으면 GitHub issue를 등록해 주십시오. 확인 후 응답하겠습니다.
  • 최적의 성능을 위해 DataStream 요소 타입이 Generic 타입이 아니도록 하십시오. 자세한 내용은 Flink의 타입 구분을 참조하십시오. Generic이 아닌 요소를 사용하면 Kryo로 인한 serialization 오버헤드를 줄일 수 있어 ClickHouse로의 처리량이 향상됩니다.
  • maxBatchSize는 최소 1000, 이상적으로는 10,000~100,000으로 설정하는 것을 권장합니다. 자세한 내용은 대량 삽입 가이드를 참조하십시오.
  • OLTP 스타일의 중복 제거 또는 ClickHouse 업서트를 수행하려면 이 문서 페이지를 참조하십시오. 참고: 이는 재시도 시 발생하는 배치 중복 제거와 혼동해서는 안 됩니다.

문제 해결

CANNOT_READ_ALL_DATA

다음과 같은 오류가 발생할 수 있습니다:
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
원인: 가장 흔한 원인은 CANNOT_READ_ALL_DATA 오류가 ClickHouse 테이블 스키마와 Flink 레코드 스키마가 서로 불일치함을 의미한다는 점입니다. 이는 둘 중 하나가 이전 버전과 호환되지 않도록 변경될 때 발생할 수 있습니다. 해결 방법: ClickHouse 테이블 또는 커넥터 입력 데이터 타입의 스키마, 또는 둘 모두를 업데이트하여 서로 호환되도록 하십시오. 필요한 경우 Java 타입을 ClickHouse 타입으로 매핑하는 방법은 type mapping을 참조하십시오. 참고: 아직 전송 중인 레코드가 남아 있다면 커넥터를 다시 시작할 때 Flink state를 재설정해야 합니다.

낮은 처리량

ClickHouse에 쓰는 동안 커넥터의 처리량이 작업의 병렬성(Flink 작업 수)에 비례해 확장되지 않을 수 있습니다. 원인: ClickHouse의 백그라운드 파트 병합 프로세스로 인해 삽입 속도가 느려질 수 있습니다. 이는 구성된 배치 크기가 너무 작거나, 커넥터가 너무 자주 플러시하거나, 또는 두 가지가 함께 작용할 때 발생할 수 있습니다. 해결 방법: numRequestSubmittedactualRecordsPerBatch 메트릭을 모니터링하면 배치 크기(maxBatchSize)와 플러시 빈도를 어떻게 조정할지 판단하는 데 도움이 됩니다. 또한 배치 크기 권장 사항은 고급 및 권장 사용법을 참조하십시오.

ClickHouse 테이블에서 행이 누락됩니다

원인: 배치가 재시도할 수 없는 실패로 인해 폐기되었거나, 설정된 재시도 횟수 내에 삽입되지 못했습니다(ClickHouseClientConfig.setNumberOfRetries()로 설정 가능). 참고: 기본적으로 커넥터는 배치를 폐기하기 전에 최대 3번까지 다시 삽입을 시도합니다. 해결 방법: 근본 원인을 파악하려면 TaskManager 로그 및/또는 stack trace를 확인하십시오.

기여 및 지원

프로젝트에 기여하거나 문제를 보고하려는 경우, 의견을 보내주시면 감사하겠습니다! 이슈를 등록하거나 개선 사항을 제안하거나 pull request를 제출하려면 GitHub 리포지토리를 방문하세요. 기여는 언제나 환영합니다! 시작하기 전에 리포지토리의 기여 가이드를 확인해 주세요. ClickHouse Flink 커넥터 개선에 도움을 주셔서 감사합니다!
마지막 수정일 2026년 6월 10일