Apache Beam은 개발자가 배치 및 스트림(연속) 데이터 처리 파이프라인을 모두 정의하고 실행할 수 있도록 지원하는 오픈 소스 통합 프로그래밍 모델입니다. Apache Beam의 유연성은 ETL(Extract, Transform, Load) 작업부터 복잡한 이벤트 처리와 실시간 분석에 이르기까지, 다양한 데이터 처리 시나리오를 지원할 수 있다는 점에 있습니다.
이 통합은 기본 삽입 계층으로 ClickHouse의 공식 JDBC 커넥터를 활용합니다.
Apache Beam과 ClickHouse를 통합하는 데 필요한 패키지는 널리 사용되는 여러 데이터 저장 시스템 및 데이터베이스용 통합 번들인 Apache Beam I/O Connectors에서 유지 관리 및 개발됩니다.
org.apache.beam.sdk.io.clickhouse.ClickHouseIO 구현은 Apache Beam repo에 있습니다.
Apache Beam용 ClickHouse 패키지 설정
패키지 관리 프레임워크에 다음 종속 항목을 추가하십시오:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-clickhouse</artifactId>
<version>${beam.version}</version>
</dependency>
권장 Beam 버전ClickHouseIO 커넥터는 Apache Beam 2.59.0 버전부터 사용하는 것을 권장합니다.
그 이전 버전에서는 커넥터의 기능이 완전히 지원되지 않을 수 있습니다.
아티팩트는 공식 Maven 리포지토리에서 확인할 수 있습니다.
다음 예시는 input.csv라는 이름의 CSV 파일을 PCollection으로 읽고, 이를 행(Row) 객체로 변환한 다음(정의된 스키마 사용), ClickHouseIO를 사용하여 로컬 ClickHouse 인스턴스에 삽입합니다:
package org.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.clickhouse.ClickHouseIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
public class Main {
public static void main(String[] args) {
// Pipeline 객체를 생성합니다.
Pipeline p = Pipeline.create();
Schema SCHEMA =
Schema.builder()
.addField(Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true))
.addField(Schema.Field.of("age", Schema.FieldType.INT16).withNullable(true))
.addField(Schema.Field.of("insertion_time", Schema.FieldType.DATETIME).withNullable(false))
.build();
// 파이프라인에 변환을 적용합니다.
PCollection<String> lines = p.apply("ReadLines", TextIO.read().from("src/main/resources/input.csv"));
PCollection<Row> rows = lines.apply("ConvertToRow", ParDo.of(new DoFn<String, Row>() {
@ProcessElement
public void processElement(@Element String line, OutputReceiver<Row> out) {
String[] values = line.split(",");
Row row = Row.withSchema(SCHEMA)
.addValues(values[0], Short.parseShort(values[1]), DateTime.now())
.build();
out.output(row);
}
})).setRowSchema(SCHEMA);
rows.apply("Write to ClickHouse",
ClickHouseIO.write("jdbc:clickhouse://localhost:8123/default?user=default&password=******", "test_table"));
// 파이프라인을 실행합니다.
p.run().waitUntilFinish();
}
}
| ClickHouse | Apache Beam | 지원 여부 | 참고 사항 |
|---|
TableSchema.TypeName.FLOAT32 | Schema.TypeName#FLOAT | ✅ | |
TableSchema.TypeName.FLOAT64 | Schema.TypeName#DOUBLE | ✅ | |
TableSchema.TypeName.INT8 | Schema.TypeName#BYTE | ✅ | |
TableSchema.TypeName.INT16 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.INT32 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.INT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.STRING | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.UINT8 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.UINT16 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.UINT32 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.UINT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.DATE | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.DATETIME | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.ARRAY | Schema.TypeName#ARRAY | ✅ | |
TableSchema.TypeName.ENUM8 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.ENUM16 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.BOOL | Schema.TypeName#BOOLEAN | ✅ | |
TableSchema.TypeName.TUPLE | Schema.TypeName#ROW | ✅ | |
TableSchema.TypeName.FIXEDSTRING | FixedBytes | ✅ | FixedBytes는 고정 길이 바이트 배열을 나타내는 LogicalType으로, org.apache.beam.sdk.schemas.logicaltypes에 위치합니다 |
| Schema.TypeName#DECIMAL | ❌ | |
| Schema.TypeName#MAP | ❌ | |
다음 세터 함수를 사용하여 ClickHouseIO.Write 구성을 조정할 수 있습니다.
| 매개변수 설정 함수 | 인수 유형 | 기본값 | 설명 |
|---|
withMaxInsertBlockSize | (long maxInsertBlockSize) | 1000000 | 삽입할 행 블록의 최대 크기입니다. |
withMaxRetries | (int maxRetries) | 5 | 실패한 삽입에 대한 최대 재시도 횟수입니다. |
withMaxCumulativeBackoff | (Duration maxBackoff) | Duration.standardDays(1000) | 재시도에 대한 최대 누적 백오프 기간입니다. |
withInitialBackoff | (Duration initialBackoff) | Duration.standardSeconds(5) | 첫 번째 재시도 전에 적용되는 초기 백오프 기간입니다. |
withInsertDistributedSync | (Boolean sync) | true | true이면 분산 테이블의 삽입 작업을 동기화합니다. |
withInsertQuorum | (Long quorum) | null | 삽입 작업을 확인하는 데 필요한 레플리카 수입니다. |
withInsertDeduplicate | (Boolean deduplicate) | true | true이면 삽입 작업에 대해 중복 제거가 활성화됩니다. |
withTableSchema | (TableSchema schema) | null | 대상 ClickHouse 테이블의 스키마입니다. |
커넥터를 사용할 때는 다음 제한 사항을 고려하십시오.
- 현재는 싱크 작업만 지원합니다. 이 커넥터는 소스 작업을 지원하지 않습니다.
- ClickHouse는
ReplicatedMergeTree 또는 그 위에 구축된 Distributed 테이블에 삽입할 때 중복 제거를 수행합니다. 복제가 없는 경우 일반 MergeTree에 삽입할 때 삽입이 실패한 후 재시도가 성공하면 중복이 발생할 수 있습니다. 하지만 각 블록은 원자적으로 삽입되며, 블록 크기는 ClickHouseIO.Write.withMaxInsertBlockSize(long)를 사용해 설정할 수 있습니다. 중복 제거는 삽입된 블록의 체크섬을 사용해 수행됩니다. 중복 제거에 대한 자세한 내용은 Deduplication 및 Deduplicate insertion config를 참조하십시오.
- 이 커넥터는 DDL SQL 문을 수행하지 않으므로, 대상 테이블은 삽입 전에 미리 존재해야 합니다.