跳转到主要内容
这是由 ClickHouse 官方支持的 Apache Flink Sink Connector。它基于 Flink 的 AsyncSinkBase 和官方 ClickHouse Java 客户端 构建。 该 连接器 支持 Apache Flink 的 DataStream API。对 Table API 的支持计划在后续 release 中提供

要求

  • Java 11+ (用于 Flink 1.17+) 或 17+ (用于 Flink 2.0+)
  • Apache Flink 1.17+
该连接器分为两个制品,以同时支持 Flink 1.17+ 和 Flink 2.0+。请选择与所用 Flink 版本对应的制品:
Flink 版本制品ClickHouse Java 客户端 版本所需 Java
latestflink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+
该连接器尚未针对早于 Flink 1.17.2 的版本进行测试。

安装与设置

作为依赖引入

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

下载二进制包

二进制 JAR 的命名规则如下:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
其中: 你可以在 Maven Central 仓库 中找到所有已发布的 JAR 文件。

使用 DataStream API

代码示例

假设你想将原始 CSV 数据插入到 ClickHouse:
public static void main(String[] args) {
    // 配置 ClickHouseClient
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // 创建一个 ElementConverter
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // 创建 sink,并使用 `setClickHouseFormat` 设置格式
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // 最后,将 DataStream 连接到 sink。
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}
更多示例和代码片段可在我们的测试中找到:

快速入门示例

我们提供了基于 Maven 的示例,方便您快速开始使用 ClickHouse Sink: 如需更详细的说明,请参阅 示例指南

DataStream API 连接选项

ClickHouse 客户端选项

ParametersDescriptionDefault ValueRequired
url完整的 ClickHouse URL不适用
usernameClickHouse 数据库用户名不适用
passwordClickHouse 数据库密码不适用
databaseClickHouse 数据库名称不适用
tableClickHouse 表名不适用
optionsJava 客户端配置选项的映射空映射
serverSettingsClickHouse 服务端会话设置的映射空映射
enableJsonSupportAsString用于让 ClickHouse 服务端期望 JSON 数据类型 采用 JSON 格式 String 的服务端设置true
optionsserverSettings 应以 Map<String, String> 的形式传递给客户端。如果其中任一项为空映射,则分别使用客户端或服务端的默认值。
所有可用的 Java 客户端选项均列在 ClientConfigProperties.java此文档页面中。所有可用的服务端会话设置均列在此文档页面中。
例如:
Map<String, String> javaClientOptions = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

Map<String, String> serverSettings = Map.of(
    "insert_deduplicate", "1"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    javaClientOptions,
    serverSettings,
    false // enableJsonSupportAsString
);

Sink 选项

以下选项直接来自 Flink 的 AsyncSinkBase
参数描述默认值必填
maxBatchSize单个批次中可插入的最大记录数N/A
maxInFlightRequests在 sink 开始施加背压之前,允许的最大进行中请求数N/A
maxBufferedRequests在 sink 开始施加背压之前,可在其中缓冲的最大记录数N/A
maxBatchSizeInBytes一个批次允许达到的最大大小 (以字节为单位) 。所有发送的批次都将小于或等于该大小N/A
maxTimeInBufferMS记录在被刷新前可在 sink 中停留的最长时间N/A
maxRecordSizeInBytessink 可接受的最大记录大小,超过该大小的记录会被自动拒绝N/A

支持的数据类型

下表快速列出了将数据从 Flink 插入 ClickHouse 时的数据类型转换对应关系。
Java 类型ClickHouse 类型是否支持序列化方法
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTimeN/A
long/LongTime64N/A
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariantN/A
注意:
  • 执行日期操作时,必须提供 ZoneId
  • 执行 decimal 操作时,必须提供精度和标度
  • 要让 ClickHouse 将 Java String 解析为 JSON,需要在 ClickHouseClientConfig 中启用 enableJsonSupportAsString
  • 该 连接器 需要一个 ElementConvertor,用于将输入 DataStream 中的元素映射为 ClickHouse 载荷。为此,连接器 提供了 ClickHouseConvertorPOJOConvertor,你可以结合上述 DataWriter 序列化方法使用它们来实现这种映射。

支持的输入格式

你可以在此文档页面ClickHouseFormat.java 中查看可用的 ClickHouse 输入格式列表。 要指定连接器用于将 DataStream 序列化为 ClickHouse 载荷的格式,请使用 setClickHouseFormat 函数。例如:
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString,
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS,
        MAX_BUFFERED_REQUESTS,
        MAX_BATCH_SIZE_IN_BYTES,
        MAX_TIME_IN_BUFFER_MS,
        MAX_RECORD_SIZE_IN_BYTES,
        clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
默认情况下,如果在 ClickHouseClientConfig 中显式将 setSupportDefault 设置为 true 或 false,连接器将分别使用 RowBinaryWithDefaultsRowBinary

指标

该连接器在 Flink 现有指标的基础上,还额外暴露了以下指标:
指标描述类型状态
numBytesSend请求载荷中发送到 ClickHouse 的总字节数。注意:该指标衡量的是通过网络发送的序列化数据大小,可能与 ClickHouse 的 system.query_log 中的 written_bytes 不同;后者反映的是数据经过处理后实际写入存储的字节数计数器
numRecordSend发送到 ClickHouse 的记录总数计数器
numRequestSubmitted已发送的请求总数 (即实际执行的 flush 次数)计数器
numOfDroppedBatches因不可重试失败而丢弃的批次总数计数器
numOfDroppedRecords因不可重试失败而丢弃的记录总数计数器
totalBatchRetries因可重试失败而进行的批次重试总数计数器
writeLatencyHistogram成功写入延迟分布的直方图 (毫秒)直方图
writeFailureLatencyHistogram写入失败延迟分布的直方图 (毫秒)直方图
triggeredByMaxBatchSizeCounter因达到 maxBatchSize 而触发的 flush 总数计数器
triggeredByMaxBatchSizeInBytesCounter因达到 maxBatchSizeInBytes 而触发的 flush 总数计数器
triggeredByMaxTimeInBufferMSCounter因达到 maxTimeInBufferMS 而触发的 flush 总数计数器
actualRecordsPerBatch实际批次大小分布的直方图直方图
actualBytesPerBatch每个批次实际字节数分布的直方图直方图

局限性

  • 该 sink 当前提供至少一次交付保证。实现精确一次语义的相关工作可在此处跟踪。
  • 该 sink 目前尚不支持用于缓冲无法处理记录的死信队列 (DLQ) 。在此之前,连接器 会尝试重新 insert 失败的记录;如果仍然失败,则会将其丢弃。此功能可在此处跟踪。
  • 该 sink 目前尚不支持通过 Flink 的 Table API 或 Flink SQL 创建。此功能可在此处跟踪。

ClickHouse 版本兼容性与安全

  • 该连接器通过每日 CI 工作流针对一系列较新的 ClickHouse 版本进行测试,包括 latest 和 head。随着新的 ClickHouse 发行版进入活跃状态,测试版本也会定期更新。有关该连接器每日测试的版本,请参见此处
  • 有关已知安全漏洞以及如何报告漏洞,请参见 ClickHouse 安全策略
  • 我们建议持续升级该连接器,以免错过安全修复和新改进。
  • 如果你在迁移过程中遇到问题,请创建 GitHub issue,我们会回复!
  • 为获得最佳性能,请确保你的 DataStream 元素类型不是 Generic 类型——请参阅这篇关于 Flink 类型区分的说明。非 Generic 元素可避免 Kryo 引入的序列化开销,并提升写入 ClickHouse 的吞吐量。
  • 我们建议将 maxBatchSize 设置为至少 1000,理想范围是 10,000 到 100,000。更多信息请参阅这篇关于批量 insert 的指南
  • 如需在 ClickHouse 中执行 OLTP 风格的去重或 upsert,请参阅此文档页面注意:不要将其与发生在 retries 时的批次去重混淆。

故障排查

CANNOT_READ_ALL_DATA

可能会发生以下错误:
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
原因:最常见的情况是,CANNOT_READ_ALL_DATA 错误表示你的 ClickHouse 表 schema 与 Flink 记录 schema 不一致。这通常发生在其中一方以不向后兼容的方式发生变更时。 解决方案:更新 ClickHouse 表 schema 或 连接器 输入数据类型中的 schema (或同时更新两者) ,使其相互兼容。如有需要,请参阅type mapping,了解如何将 Java 类型映射为 ClickHouse 类型。注意:如果仍有记录在传输过程中,则在重启 连接器 时需要重置 Flink 状态。

吞吐量低

将数据写入 ClickHouse 时,你可能会发现连接器的吞吐量不会随着作业并行度 (Flink task 数量) 提升而线性扩展。 原因:ClickHouse 的后台分片合并过程可能会拖慢 insert。在配置的批次大小过小、连接器 flush 过于频繁,或两者同时存在时,都可能出现这种情况。 解决方案:监控 numRequestSubmittedactualRecordsPerBatch 指标,以帮助判断如何调整批次大小 (maxBatchSize) 以及 flush 频率。另请参阅高级和推荐用法中的批次大小建议。

我的 ClickHouse 表中有行缺失

原因:这些批次被丢弃了,原因可能是发生了不可重试的故障,或者在配置的重试次数内仍无法插入 (可通过 ClickHouseClientConfig.setNumberOfRetries() 设置) 。注意:默认情况下,连接器会在丢弃某个批次之前最多尝试重新插入 3 次。 解决方案:检查 TaskManager 日志和/或堆栈跟踪,以定位根本原因。

贡献与支持

如果您想为该项目贡献力量或报告任何问题,欢迎向我们反馈! 请访问我们的 GitHub 仓库 提交 issue、提出 改进建议,或提交拉取请求。 欢迎贡献!开始之前,请先查阅仓库中的贡献指南。 感谢您帮助改进 ClickHouse Flink 连接器!
最后修改于 2026年6月10日