这是由 ClickHouse 官方支持的 Apache Flink Sink Connector。它基于 Flink 的 AsyncSinkBase 和官方 ClickHouse Java 客户端 构建。
该 连接器 支持 Apache Flink 的 DataStream API。对 Table API 的支持计划在后续 release 中提供。
- Java 11+ (用于 Flink 1.17+) 或 17+ (用于 Flink 2.0+)
- Apache Flink 1.17+
该连接器分为两个制品,以同时支持 Flink 1.17+ 和 Flink 2.0+。请选择与所用 Flink 版本对应的制品:
| Flink 版本 | 制品 | ClickHouse Java 客户端 版本 | 所需 Java |
|---|
| latest | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.1 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
该连接器尚未针对早于 Flink 1.17.2 的版本进行测试。
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-2.0.0:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-2.0.0" % {{ stable_version }} classifier "all"
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-1.17</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-1.17:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-1.17" % {{ stable_version }} classifier "all"
二进制 JAR 的命名规则如下:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
其中:
flink_version 为 2.0.0 或 1.17
stable_version 是稳定版本制品的发布版本
你可以在 Maven Central 仓库 中找到所有已发布的 JAR 文件。
假设你想将原始 CSV 数据插入到 ClickHouse:
public static void main(String[] args) {
// 配置 ClickHouseClient
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
// 创建一个 ElementConverter
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
// 创建 sink,并使用 `setClickHouseFormat` 设置格式
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
// 最后,将 DataStream 连接到 sink。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Path csvFilePath = new Path(fileFullName);
FileSource<String> csvSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
.build();
env.fromSource(
csvSource,
WatermarkStrategy.noWatermarks(),
"GzipCsvSource"
).sinkTo(csvSink);
}
更多示例和代码片段可在我们的测试中找到:
我们提供了基于 Maven 的示例,方便您快速开始使用 ClickHouse Sink:
如需更详细的说明,请参阅 示例指南
| Parameters | Description | Default Value | Required |
|---|
url | 完整的 ClickHouse URL | 不适用 | 是 |
username | ClickHouse 数据库用户名 | 不适用 | 是 |
password | ClickHouse 数据库密码 | 不适用 | 是 |
database | ClickHouse 数据库名称 | 不适用 | 是 |
table | ClickHouse 表名 | 不适用 | 是 |
options | Java 客户端配置选项的映射 | 空映射 | 否 |
serverSettings | ClickHouse 服务端会话设置的映射 | 空映射 | 否 |
enableJsonSupportAsString | 用于让 ClickHouse 服务端期望 JSON 数据类型 采用 JSON 格式 String 的服务端设置 | true | 否 |
options 和 serverSettings 应以 Map<String, String> 的形式传递给客户端。如果其中任一项为空映射,则分别使用客户端或服务端的默认值。
例如:
Map<String, String> javaClientOptions = Map.of(
ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);
Map<String, String> serverSettings = Map.of(
"insert_deduplicate", "1"
);
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
url,
username,
password,
database,
tableName,
javaClientOptions,
serverSettings,
false // enableJsonSupportAsString
);
以下选项直接来自 Flink 的 AsyncSinkBase:
| 参数 | 描述 | 默认值 | 必填 |
|---|
maxBatchSize | 单个批次中可插入的最大记录数 | N/A | 是 |
maxInFlightRequests | 在 sink 开始施加背压之前,允许的最大进行中请求数 | N/A | 是 |
maxBufferedRequests | 在 sink 开始施加背压之前,可在其中缓冲的最大记录数 | N/A | 是 |
maxBatchSizeInBytes | 一个批次允许达到的最大大小 (以字节为单位) 。所有发送的批次都将小于或等于该大小 | N/A | 是 |
maxTimeInBufferMS | 记录在被刷新前可在 sink 中停留的最长时间 | N/A | 是 |
maxRecordSizeInBytes | sink 可接受的最大记录大小,超过该大小的记录会被自动拒绝 | N/A | 是 |
下表快速列出了将数据从 Flink 插入 ClickHouse 时的数据类型转换对应关系。
将 Flink 中的数据插入 ClickHouse
| Java 类型 | ClickHouse 类型 | 是否支持 | 序列化方法 |
|---|
byte/Byte | Int8 | ✅ | DataWriter.writeInt8 |
short/Short | Int16 | ✅ | DataWriter.writeInt16 |
int/Integer | Int32 | ✅ | DataWriter.writeInt32 |
long/Long | Int64 | ✅ | DataWriter.writeInt64 |
BigInteger | Int128 | ✅ | DataWriter.writeInt128 |
BigInteger | Int256 | ✅ | DataWriter.writeInt256 |
short/Short | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt16 | ✅ | DataWriter.writeUInt16 |
long/Long | UInt32 | ✅ | DataWriter.writeUInt32 |
long/Long | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt128 | ✅ | DataWriter.writeUInt128 |
BigInteger | UInt256 | ✅ | DataWriter.writeUInt256 |
BigDecimal | Decimal | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal32 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal64 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal128 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal256 | ✅ | DataWriter.writeDecimal |
float/Float | Float | ✅ | DataWriter.writeFloat32 |
double/Double | Double | ✅ | DataWriter.writeFloat64 |
boolean/Boolean | Boolean | ✅ | DataWriter.writeBoolean |
String | String | ✅ | DataWriter.writeString |
String | FixedString | ✅ | DataWriter.writeFixedString |
LocalDate | Date | ✅ | DataWriter.writeDate |
LocalDate | Date32 | ✅ | DataWriter.writeDate32 |
LocalDateTime | DateTime | ✅ | DataWriter.writeDateTime |
ZonedDateTime | DateTime | ✅ | DataWriter.writeDateTime |
LocalDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
ZonedDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
int/Integer | Time | ❌ | N/A |
long/Long | Time64 | ❌ | N/A |
byte/Byte | Enum8 | ✅ | DataWriter.writeInt8 |
int/Integer | Enum16 | ✅ | DataWriter.writeInt16 |
java.util.UUID | UUID | ✅ | DataWriter.writeIntUUID |
String | JSON | ✅ | DataWriter.writeJSON |
Array<Type> | Array<Type> | ✅ | DataWriter.writeArray |
Map<K,V> | Map<K,V> | ✅ | DataWriter.writeMap |
Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | DataWriter.writeTuple |
Object | Variant | ❌ | N/A |
注意:
- 执行日期操作时,必须提供
ZoneId。
- 执行 decimal 操作时,必须提供精度和标度。
- 要让 ClickHouse 将 Java String 解析为 JSON,需要在
ClickHouseClientConfig 中启用 enableJsonSupportAsString。
- 该 连接器 需要一个
ElementConvertor,用于将输入 DataStream 中的元素映射为 ClickHouse 载荷。为此,连接器 提供了 ClickHouseConvertor 和 POJOConvertor,你可以结合上述 DataWriter 序列化方法使用它们来实现这种映射。
你可以在此文档页面和 ClickHouseFormat.java 中查看可用的 ClickHouse 输入格式列表。
要指定连接器用于将 DataStream 序列化为 ClickHouse 载荷的格式,请使用 setClickHouseFormat 函数。例如:
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
该连接器在 Flink 现有指标的基础上,还额外暴露了以下指标:
| 指标 | 描述 | 类型 | 状态 |
|---|
numBytesSend | 请求载荷中发送到 ClickHouse 的总字节数。注意:该指标衡量的是通过网络发送的序列化数据大小,可能与 ClickHouse 的 system.query_log 中的 written_bytes 不同;后者反映的是数据经过处理后实际写入存储的字节数 | 计数器 | ✅ |
numRecordSend | 发送到 ClickHouse 的记录总数 | 计数器 | ✅ |
numRequestSubmitted | 已发送的请求总数 (即实际执行的 flush 次数) | 计数器 | ✅ |
numOfDroppedBatches | 因不可重试失败而丢弃的批次总数 | 计数器 | ✅ |
numOfDroppedRecords | 因不可重试失败而丢弃的记录总数 | 计数器 | ✅ |
totalBatchRetries | 因可重试失败而进行的批次重试总数 | 计数器 | ✅ |
writeLatencyHistogram | 成功写入延迟分布的直方图 (毫秒) | 直方图 | ✅ |
writeFailureLatencyHistogram | 写入失败延迟分布的直方图 (毫秒) | 直方图 | ✅ |
triggeredByMaxBatchSizeCounter | 因达到 maxBatchSize 而触发的 flush 总数 | 计数器 | ✅ |
triggeredByMaxBatchSizeInBytesCounter | 因达到 maxBatchSizeInBytes 而触发的 flush 总数 | 计数器 | ✅ |
triggeredByMaxTimeInBufferMSCounter | 因达到 maxTimeInBufferMS 而触发的 flush 总数 | 计数器 | ✅ |
actualRecordsPerBatch | 实际批次大小分布的直方图 | 直方图 | ✅ |
actualBytesPerBatch | 每个批次实际字节数分布的直方图 | 直方图 | ✅ |
- 该 sink 当前提供至少一次交付保证。实现精确一次语义的相关工作可在此处跟踪。
- 该 sink 目前尚不支持用于缓冲无法处理记录的死信队列 (DLQ) 。在此之前,连接器 会尝试重新 insert 失败的记录;如果仍然失败,则会将其丢弃。此功能可在此处跟踪。
- 该 sink 目前尚不支持通过 Flink 的 Table API 或 Flink SQL 创建。此功能可在此处跟踪。
- 该连接器通过每日 CI 工作流针对一系列较新的 ClickHouse 版本进行测试,包括 latest 和 head。随着新的 ClickHouse 发行版进入活跃状态,测试版本也会定期更新。有关该连接器每日测试的版本,请参见此处。
- 有关已知安全漏洞以及如何报告漏洞,请参见 ClickHouse 安全策略。
- 我们建议持续升级该连接器,以免错过安全修复和新改进。
- 如果你在迁移过程中遇到问题,请创建 GitHub issue,我们会回复!
- 为获得最佳性能,请确保你的 DataStream 元素类型不是 Generic 类型——请参阅这篇关于 Flink 类型区分的说明。非 Generic 元素可避免 Kryo 引入的序列化开销,并提升写入 ClickHouse 的吞吐量。
- 我们建议将
maxBatchSize 设置为至少 1000,理想范围是 10,000 到 100,000。更多信息请参阅这篇关于批量 insert 的指南。
- 如需在 ClickHouse 中执行 OLTP 风格的去重或 upsert,请参阅此文档页面。注意:不要将其与发生在 retries 时的批次去重混淆。
可能会发生以下错误:
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
原因:最常见的情况是,CANNOT_READ_ALL_DATA 错误表示你的 ClickHouse 表 schema 与 Flink 记录 schema 不一致。这通常发生在其中一方以不向后兼容的方式发生变更时。
解决方案:更新 ClickHouse 表 schema 或 连接器 输入数据类型中的 schema (或同时更新两者) ,使其相互兼容。如有需要,请参阅type mapping,了解如何将 Java 类型映射为 ClickHouse 类型。注意:如果仍有记录在传输过程中,则在重启 连接器 时需要重置 Flink 状态。
将数据写入 ClickHouse 时,你可能会发现连接器的吞吐量不会随着作业并行度 (Flink task 数量) 提升而线性扩展。
原因:ClickHouse 的后台分片合并过程可能会拖慢 insert。在配置的批次大小过小、连接器 flush 过于频繁,或两者同时存在时,都可能出现这种情况。
解决方案:监控 numRequestSubmitted 和 actualRecordsPerBatch 指标,以帮助判断如何调整批次大小 (maxBatchSize) 以及 flush 频率。另请参阅高级和推荐用法中的批次大小建议。
原因:这些批次被丢弃了,原因可能是发生了不可重试的故障,或者在配置的重试次数内仍无法插入 (可通过 ClickHouseClientConfig.setNumberOfRetries() 设置) 。注意:默认情况下,连接器会在丢弃某个批次之前最多尝试重新插入 3 次。
解决方案:检查 TaskManager 日志和/或堆栈跟踪,以定位根本原因。
如果您想为该项目贡献力量或报告任何问题,欢迎向我们反馈!
请访问我们的 GitHub 仓库 提交 issue、提出
改进建议,或提交拉取请求。
欢迎贡献!开始之前,请先查阅仓库中的贡献指南。
感谢您帮助改进 ClickHouse Flink 连接器!