이 커넥터는 ClickHouse가 지원하는 공식 Apache Flink Sink Connector입니다. Flink의 AsyncSinkBase와 공식 ClickHouse Java 클라이언트를 기반으로 구축되었습니다.
이 커넥터는 Apache Flink의 DataStream API를 지원합니다. Table API 지원은 향후 릴리스에서 추가될 예정입니다.
- Java 11 이상(Flink 1.17+) 또는 17 이상(Flink 2.0+)
- Apache Flink 1.17+
이 커넥터는 Flink 1.17+와 Flink 2.0+를 모두 지원할 수 있도록 두 개의 아티팩트로 나뉘어 있습니다. 사용할 Flink 버전에 맞는 아티팩트를 선택하십시오:
| Flink 버전 | 아티팩트 | ClickHouse Java 클라이언트 버전 | 필수 Java |
|---|
| latest | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.1 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
이 커넥터는 Flink 1.17.2보다 이전 버전에서는 테스트되지 않았습니다.
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-2.0.0:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-2.0.0" % {{ stable_version }} classifier "all"
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-1.17</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-1.17:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-1.17" % {{ stable_version }} classifier "all"
바이너리 JAR의 명명 패턴은 다음과 같습니다:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
여기서:
사용 가능한 모든 JAR 파일은 Maven Central Repository에서 확인할 수 있습니다.
원시 CSV 데이터를 ClickHouse에 삽입한다고 가정해 보겠습니다:
public static void main(String[] args) {
// ClickHouseClient 구성
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
// ElementConverter 생성
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
// 싱크를 생성하고 `setClickHouseFormat`으로 포맷을 설정
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
// 마지막으로 DataStream을 싱크에 연결
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Path csvFilePath = new Path(fileFullName);
FileSource<String> csvSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
.build();
env.fromSource(
csvSource,
WatermarkStrategy.noWatermarks(),
"GzipCsvSource"
).sinkTo(csvSink);
}
추가 예시와 스니펫은 테스트 코드에서 확인할 수 있습니다:
ClickHouse Sink를 간편하게 시작할 수 있도록 Maven 기반 예시를 제공했습니다:
자세한 지침은 예시 가이드를 참조하십시오.
| 매개변수 | 설명 | 기본값 | 필수 |
|---|
url | 정규화된 ClickHouse URL | 해당 없음 | 예 |
username | ClickHouse 데이터베이스 사용자 이름 | 해당 없음 | 예 |
password | ClickHouse 데이터베이스 비밀번호 | 해당 없음 | 예 |
database | ClickHouse 데이터베이스 이름 | 해당 없음 | 예 |
table | ClickHouse 테이블 이름 | 해당 없음 | 예 |
options | Java 클라이언트 구성 옵션 맵 | 빈 맵 | 아니요 |
serverSettings | ClickHouse 서버 세션 설정 맵 | 빈 맵 | 아니요 |
enableJsonSupportAsString | JSON 데이터 타입에 대해 JSON 포맷의 String을 받도록 하는 ClickHouse 서버 설정 | true | 아니요 |
options와 serverSettings는 Map<String, String> 형태로 클라이언트에 전달해야 합니다. 둘 중 어느 하나에 빈 맵을 지정하면 각각 클라이언트 또는 서버의 기본값이 사용됩니다.
예를 들면 다음과 같습니다.
Map<String, String> javaClientOptions = Map.of(
ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);
Map<String, String> serverSettings = Map.of(
"insert_deduplicate", "1"
);
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
url,
username,
password,
database,
tableName,
javaClientOptions,
serverSettings,
false // enableJsonSupportAsString
);
다음 옵션은 Flink의 AsyncSinkBase에서 직접 제공됩니다:
| 매개변수 | 설명 | 기본값 | 필수 |
|---|
maxBatchSize | 단일 배치에서 삽입할 수 있는 레코드의 최대 개수 | N/A | 예 |
maxInFlightRequests | 싱크가 백프레셔를 적용하기 전에 허용되는 진행 중 요청의 최대 개수 | N/A | 예 |
maxBufferedRequests | 백프레셔가 적용되기 전에 싱크에 버퍼링할 수 있는 레코드의 최대 개수 | N/A | 예 |
maxBatchSizeInBytes | 배치가 가질 수 있는 최대 크기(바이트)입니다. 전송되는 모든 배치는 이 크기 이하입니다 | N/A | 예 |
maxTimeInBufferMS | 플러시되기 전까지 레코드가 싱크에 머무를 수 있는 최대 시간 | N/A | 예 |
maxRecordSizeInBytes | 싱크가 허용하는 최대 레코드 크기이며, 이를 초과하는 레코드는 자동으로 거부됩니다 | N/A | 예 |
아래 표는 Flink에서 ClickHouse로 데이터를 삽입할 때 데이터 타입 변환을 빠르게 참고할 수 있도록 정리한 것입니다.
Flink에서 ClickHouse로 데이터 삽입하기
| Java 유형 | ClickHouse 유형 | 지원 여부 | 직렬화 방식 |
|---|
byte/Byte | Int8 | ✅ | DataWriter.writeInt8 |
short/Short | Int16 | ✅ | DataWriter.writeInt16 |
int/Integer | Int32 | ✅ | DataWriter.writeInt32 |
long/Long | Int64 | ✅ | DataWriter.writeInt64 |
BigInteger | Int128 | ✅ | DataWriter.writeInt128 |
BigInteger | Int256 | ✅ | DataWriter.writeInt256 |
short/Short | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt16 | ✅ | DataWriter.writeUInt16 |
long/Long | UInt32 | ✅ | DataWriter.writeUInt32 |
long/Long | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt128 | ✅ | DataWriter.writeUInt128 |
BigInteger | UInt256 | ✅ | DataWriter.writeUInt256 |
BigDecimal | Decimal | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal32 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal64 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal128 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal256 | ✅ | DataWriter.writeDecimal |
float/Float | Float | ✅ | DataWriter.writeFloat32 |
double/Double | Double | ✅ | DataWriter.writeFloat64 |
boolean/Boolean | Boolean | ✅ | DataWriter.writeBoolean |
String | String | ✅ | DataWriter.writeString |
String | FixedString | ✅ | DataWriter.writeFixedString |
LocalDate | Date | ✅ | DataWriter.writeDate |
LocalDate | Date32 | ✅ | DataWriter.writeDate32 |
LocalDateTime | DateTime | ✅ | DataWriter.writeDateTime |
ZonedDateTime | DateTime | ✅ | DataWriter.writeDateTime |
LocalDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
ZonedDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
int/Integer | Time | ❌ | N/A |
long/Long | Time64 | ❌ | N/A |
byte/Byte | Enum8 | ✅ | DataWriter.writeInt8 |
int/Integer | Enum16 | ✅ | DataWriter.writeInt16 |
java.util.UUID | UUID | ✅ | DataWriter.writeIntUUID |
String | JSON | ✅ | DataWriter.writeJSON |
Array<Type> | Array<Type> | ✅ | DataWriter.writeArray |
Map<K,V> | Map<K,V> | ✅ | DataWriter.writeMap |
Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | DataWriter.writeTuple |
Object | Variant | ❌ | N/A |
참고:
- 날짜 연산을 수행할 때는
ZoneId를 반드시 지정해야 합니다.
- Decimal 연산을 수행할 때는 precision 및 scale을 반드시 지정해야 합니다.
- ClickHouse가 Java
String을 JSON으로 파싱할 수 있도록 하려면 ClickHouseClientConfig에서 enableJsonSupportAsString을 활성화해야 합니다.
- 커넥터는 입력 DataStream의 요소를 ClickHouse 페이로드에 매핑하기 위해
ElementConvertor가 필요합니다. 이를 위해 커넥터는 ClickHouseConvertor와 POJOConvertor를 제공하며, 위의 DataWriter 직렬화 메서드를 사용해 이 매핑을 구현할 수 있습니다.
사용 가능한 ClickHouse 입력 형식 목록은 이 문서 페이지와 ClickHouseFormat.java에서 확인할 수 있습니다.
커넥터가 DataStream을 ClickHouse 페이로드로 직렬화할 때 사용할 포맷을 지정하려면 setClickHouseFormat 함수를 사용하세요. 예를 들어 다음과 같습니다:
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
이 커넥터는 Flink의 기존 메트릭에 더해 다음과 같은 추가 메트릭을 노출합니다:
| 메트릭 | 설명 | 유형 | 상태 |
|---|
numBytesSend | 요청 페이로드에서 ClickHouse로 전송된 총 바이트 수입니다. 참고: 이 메트릭은 네트워크를 통해 전송된 직렬화 데이터의 크기를 측정하므로, 처리 후 스토리지에 실제로 기록된 바이트 수를 반영하는 system.query_log의 ClickHouse written_bytes와 다를 수 있습니다 | Counter | ✅ |
numRecordSend | ClickHouse로 전송된 총 레코드 수입니다 | Counter | ✅ |
numRequestSubmitted | 전송된 총 요청 수입니다(실제로 수행된 플러시 횟수) | Counter | ✅ |
numOfDroppedBatches | 재시도할 수 없는 실패로 인해 폐기된 총 배치 수입니다 | Counter | ✅ |
numOfDroppedRecords | 재시도할 수 없는 실패로 인해 폐기된 총 레코드 수입니다 | Counter | ✅ |
totalBatchRetries | 재시도 가능한 실패로 인해 발생한 총 배치 재시도 횟수입니다 | Counter | ✅ |
writeLatencyHistogram | 성공한 쓰기의 지연 시간 분포(ms)를 나타내는 히스토그램입니다 | Histogram | ✅ |
writeFailureLatencyHistogram | 실패한 쓰기의 지연 시간 분포(ms)를 나타내는 히스토그램입니다 | Histogram | ✅ |
triggeredByMaxBatchSizeCounter | maxBatchSize에 도달해 발생한 총 플러시 횟수입니다 | Counter | ✅ |
triggeredByMaxBatchSizeInBytesCounter | maxBatchSizeInBytes에 도달해 발생한 총 플러시 횟수입니다 | Counter | ✅ |
triggeredByMaxTimeInBufferMSCounter | maxTimeInBufferMS에 도달해 발생한 총 플러시 횟수입니다 | Counter | ✅ |
actualRecordsPerBatch | 실제 배치 크기 분포를 나타내는 히스토그램입니다 | Histogram | ✅ |
actualBytesPerBatch | 배치당 실제 바이트 분포를 나타내는 히스토그램입니다 | Histogram | ✅ |
- 현재 이 싱크는 최소 1회 전달 보장을 제공합니다. 정확히 한 번 처리 의미 체계 지원 작업은 여기에서 추적되고 있습니다.
- 이 싱크는 아직 처리할 수 없는 레코드를 버퍼링하기 위한 데드 레터 큐(DLQ)를 지원하지 않습니다. 그때까지는 커넥터가 실패한 레코드를 다시 삽입하려고 시도하며, 끝내 성공하지 못하면 해당 레코드를 버립니다. 이 기능은 여기에서 추적되고 있습니다.
- 이 싱크는 아직 Flink의 Table API 또는 Flink SQL을 통해 생성하는 기능을 지원하지 않습니다. 이 기능은 여기에서 추적되고 있습니다.
- 커넥터는 일일 CI 워크플로를 통해 latest와 head를 포함한 여러 최신 ClickHouse 버전을 대상으로 테스트됩니다. 테스트 대상 버전은 새로운 ClickHouse 릴리스가 활성화되면 주기적으로 업데이트됩니다. 커넥터가 매일 어떤 버전을 대상으로 테스트하는지는 여기에서 확인할 수 있습니다.
- 알려진 보안 취약점과 취약점 보고 방법은 ClickHouse 보안 정책을 참조하십시오.
- 보안 수정과 새로운 개선 사항을 놓치지 않도록 커넥터를 계속 업그레이드할 것을 권장합니다.
- 마이그레이션에 문제가 있으면 GitHub issue를 등록해 주십시오. 확인 후 응답하겠습니다.
- 최적의 성능을 위해 DataStream 요소 타입이 Generic 타입이 아니도록 하십시오. 자세한 내용은 Flink의 타입 구분을 참조하십시오. Generic이 아닌 요소를 사용하면 Kryo로 인한 serialization 오버헤드를 줄일 수 있어 ClickHouse로의 처리량이 향상됩니다.
maxBatchSize는 최소 1000, 이상적으로는 10,000~100,000으로 설정하는 것을 권장합니다. 자세한 내용은 대량 삽입 가이드를 참조하십시오.
- OLTP 스타일의 중복 제거 또는 ClickHouse 업서트를 수행하려면 이 문서 페이지를 참조하십시오. 참고: 이는 재시도 시 발생하는 배치 중복 제거와 혼동해서는 안 됩니다.
다음과 같은 오류가 발생할 수 있습니다:
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
원인: 가장 흔한 원인은 CANNOT_READ_ALL_DATA 오류가 ClickHouse 테이블 스키마와 Flink 레코드 스키마가 서로 불일치함을 의미한다는 점입니다. 이는 둘 중 하나가 이전 버전과 호환되지 않도록 변경될 때 발생할 수 있습니다.
해결 방법: ClickHouse 테이블 또는 커넥터 입력 데이터 타입의 스키마, 또는 둘 모두를 업데이트하여 서로 호환되도록 하십시오. 필요한 경우 Java 타입을 ClickHouse 타입으로 매핑하는 방법은 type mapping을 참조하십시오. 참고: 아직 전송 중인 레코드가 남아 있다면 커넥터를 다시 시작할 때 Flink state를 재설정해야 합니다.
ClickHouse에 쓰는 동안 커넥터의 처리량이 작업의 병렬성(Flink 작업 수)에 비례해 확장되지 않을 수 있습니다.
원인: ClickHouse의 백그라운드 파트 병합 프로세스로 인해 삽입 속도가 느려질 수 있습니다. 이는 구성된 배치 크기가 너무 작거나, 커넥터가 너무 자주 플러시하거나, 또는 두 가지가 함께 작용할 때 발생할 수 있습니다.
해결 방법: numRequestSubmitted 및 actualRecordsPerBatch 메트릭을 모니터링하면 배치 크기(maxBatchSize)와 플러시 빈도를 어떻게 조정할지 판단하는 데 도움이 됩니다. 또한 배치 크기 권장 사항은 고급 및 권장 사용법을 참조하십시오.
ClickHouse 테이블에서 행이 누락됩니다
원인: 배치가 재시도할 수 없는 실패로 인해 폐기되었거나, 설정된 재시도 횟수 내에 삽입되지 못했습니다(ClickHouseClientConfig.setNumberOfRetries()로 설정 가능). 참고: 기본적으로 커넥터는 배치를 폐기하기 전에 최대 3번까지 다시 삽입을 시도합니다.
해결 방법: 근본 원인을 파악하려면 TaskManager 로그 및/또는 stack trace를 확인하십시오.
프로젝트에 기여하거나 문제를 보고하려는 경우, 의견을 보내주시면 감사하겠습니다!
이슈를 등록하거나 개선 사항을 제안하거나
pull request를 제출하려면 GitHub 리포지토리를 방문하세요.
기여는 언제나 환영합니다! 시작하기 전에 리포지토리의 기여 가이드를 확인해 주세요.
ClickHouse Flink 커넥터 개선에 도움을 주셔서 감사합니다!