Это официальный sink-коннектор для Apache Flink, поддерживаемый ClickHouse. Он построен на основе AsyncSinkBase из Flink и официального Java-клиента ClickHouse.
Коннектор поддерживает DataStream API Apache Flink. Поддержка Table API планируется в одном из будущих релизов.
- Java 11+ (для Flink 1.17+) или 17+ (для Flink 2.0+)
- Apache Flink 1.17+
Матрица совместимости версий Flink
Коннектор выпускается в виде двух артефактов для поддержки Flink 1.17+ и Flink 2.0+. Выберите артефакт, соответствующий нужной версии Flink:
| Версия Flink | Артефакт | Версия Java-клиента ClickHouse | Требуемая версия Java |
|---|
| latest | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.1 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
Коннектор не тестировался с версиями Flink более ранними, чем 1.17.2
Импорт в качестве зависимости
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-2.0.0:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-2.0.0" % {{ stable_version }} classifier "all"
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-1.17</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-1.17:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-1.17" % {{ stable_version }} classifier "all"
Шаблон имени JAR-файла:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
где:
Все доступные JAR-файлы опубликованных релизов можно найти в репозитории Maven Central.
Использование DataStream API
Допустим, вы хотите выполнить вставку необработанных данных CSV в ClickHouse:
public static void main(String[] args) {
// Настройте ClickHouseClient
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
// Создайте ElementConverter
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
// Создайте sink и задайте формат с помощью `setClickHouseFormat`
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
// Наконец, подключите DataStream к sink.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Path csvFilePath = new Path(fileFullName);
FileSource<String> csvSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
.build();
env.fromSource(
csvSource,
WatermarkStrategy.noWatermarks(),
"GzipCsvSource"
).sinkTo(csvSink);
}
Другие примеры и фрагменты можно найти в наших тестах:
Мы подготовили пример на Maven, чтобы вы могли быстро начать работу с ClickHouse Sink:
Более подробные инструкции см. в руководстве с примерами
Параметры подключения к DataStream API
Параметры клиента ClickHouse
| Параметры | Описание | Значение по умолчанию | Обязательно |
|---|
url | Полный URL ClickHouse | N/A | Да |
username | Имя пользователя базы данных ClickHouse | N/A | Да |
password | Пароль пользователя базы данных ClickHouse | N/A | Да |
database | Имя базы данных ClickHouse | N/A | Да |
table | Имя таблицы ClickHouse | N/A | Да |
options | Map с параметрами конфигурации Java-клиента | Пустой Map | Нет |
serverSettings | Map с настройками сеанса сервера ClickHouse | Пустой Map | Нет |
enableJsonSupportAsString | Настройка сервера ClickHouse, указывающая, что для типа данных JSON ожидается String в формате JSON | true | Нет |
options и serverSettings следует передавать клиенту как Map<String, String>. Если для любого из них передать пустой Map, будут использоваться значения клиента или сервера по умолчанию соответственно.
Например:
Map<String, String> javaClientOptions = Map.of(
ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);
Map<String, String> serverSettings = Map.of(
"insert_deduplicate", "1"
);
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
url,
username,
password,
database,
tableName,
javaClientOptions,
serverSettings,
false // enableJsonSupportAsString
);
Следующие параметры взяты напрямую из AsyncSinkBase Flink:
| Параметры | Описание | Значение по умолчанию | Обязательно |
|---|
maxBatchSize | Максимальное количество записей, вставляемых за один батч | N/A | Да |
maxInFlightRequests | Максимальное количество запросов в обработке, допустимое до того, как sink начнет применять backpressure | N/A | Да |
maxBufferedRequests | Максимальное количество записей, которое может буферизоваться в sink до применения backpressure | N/A | Да |
maxBatchSizeInBytes | Максимальный размер батча (в байтах). Все отправляемые батчи будут меньше или равны этому значению | N/A | Да |
maxTimeInBufferMS | Максимальное время, в течение которого запись может оставаться в sink перед отправкой | N/A | Да |
maxRecordSizeInBytes | Максимальный размер записи, который принимает sink; записи большего размера будут автоматически отклоняться | N/A | Да |
Поддерживаемые типы данных
В таблице ниже приведена краткая справка по преобразованию типов данных при вставке данных из Flink в ClickHouse.
Вставка данных из Flink в ClickHouse
| Тип Java | Тип ClickHouse | Поддерживается | Метод сериализации |
|---|
byte/Byte | Int8 | ✅ | DataWriter.writeInt8 |
short/Short | Int16 | ✅ | DataWriter.writeInt16 |
int/Integer | Int32 | ✅ | DataWriter.writeInt32 |
long/Long | Int64 | ✅ | DataWriter.writeInt64 |
BigInteger | Int128 | ✅ | DataWriter.writeInt128 |
BigInteger | Int256 | ✅ | DataWriter.writeInt256 |
short/Short | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt16 | ✅ | DataWriter.writeUInt16 |
long/Long | UInt32 | ✅ | DataWriter.writeUInt32 |
long/Long | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt128 | ✅ | DataWriter.writeUInt128 |
BigInteger | UInt256 | ✅ | DataWriter.writeUInt256 |
BigDecimal | Decimal | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal32 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal64 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal128 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal256 | ✅ | DataWriter.writeDecimal |
float/Float | Float | ✅ | DataWriter.writeFloat32 |
double/Double | Double | ✅ | DataWriter.writeFloat64 |
boolean/Boolean | Boolean | ✅ | DataWriter.writeBoolean |
String | String | ✅ | DataWriter.writeString |
String | FixedString | ✅ | DataWriter.writeFixedString |
LocalDate | Date | ✅ | DataWriter.writeDate |
LocalDate | Date32 | ✅ | DataWriter.writeDate32 |
LocalDateTime | DateTime | ✅ | DataWriter.writeDateTime |
ZonedDateTime | DateTime | ✅ | DataWriter.writeDateTime |
LocalDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
ZonedDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
int/Integer | Time | ❌ | Н/Д |
long/Long | Time64 | ❌ | Н/Д |
byte/Byte | Enum8 | ✅ | DataWriter.writeInt8 |
int/Integer | Enum16 | ✅ | DataWriter.writeInt16 |
java.util.UUID | UUID | ✅ | DataWriter.writeIntUUID |
String | JSON | ✅ | DataWriter.writeJSON |
Array<Type> | Array<Type> | ✅ | DataWriter.writeArray |
Map<K,V> | Map<K,V> | ✅ | DataWriter.writeMap |
Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | DataWriter.writeTuple |
Object | Variant | ❌ | Н/Д |
Примечания:
- При выполнении операций над датами необходимо указать
ZoneId.
- При выполнении операций с десятичными числами необходимо указать точность и масштаб.
- Чтобы ClickHouse мог разобрать строку Java как JSON, необходимо включить
enableJsonSupportAsString в ClickHouseClientConfig.
- Коннектору требуется
ElementConvertor, чтобы сопоставлять элементы входного DataStream с полезными нагрузками ClickHouse. Для этого коннектор предоставляет ClickHouseConvertor и POJOConvertor, которые можно использовать для реализации такого сопоставления с помощью приведённых выше методов сериализации DataWriter.
Список доступных входных форматов ClickHouse можно найти на этой странице документации и в ClickHouseFormat.java.
Чтобы указать формат, который коннектор должен использовать для сериализации вашего DataStream в полезные нагрузки для ClickHouse, используйте функцию setClickHouseFormat. Например:
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
По умолчанию коннектор использует RowBinaryWithDefaults или RowBinary, если параметр setSupportDefault в ClickHouseClientConfig явно задан как true или false соответственно.
Коннектор предоставляет следующие дополнительные метрики помимо стандартных метрик Flink:
| Метрика | Описание | Тип | Статус |
|---|
numBytesSend | Общее количество байтов, отправленных в ClickHouse в полезной нагрузке запроса. Примечание: эта метрика измеряет размер сериализованных данных, переданных по сети, и может отличаться от written_bytes в system.query_log ClickHouse, который отражает фактическое количество байтов, записанных в хранилище после обработки | Counter | ✅ |
numRecordSend | Общее количество записей, отправленных в ClickHouse | Counter | ✅ |
numRequestSubmitted | Общее количество отправленных запросов (фактическое количество выполненных flush-операций) | Counter | ✅ |
numOfDroppedBatches | Общее количество батчей, отброшенных из-за сбоев без возможности повторной попытки | Counter | ✅ |
numOfDroppedRecords | Общее количество записей, отброшенных из-за сбоев без возможности повторной попытки | Counter | ✅ |
totalBatchRetries | Общее количество повторных попыток батчей из-за сбоев, допускающих повторную попытку | Counter | ✅ |
writeLatencyHistogram | Гистограмма распределения задержки успешной записи (мс) | Histogram | ✅ |
writeFailureLatencyHistogram | Гистограмма распределения задержки неуспешной записи (мс) | Histogram | ✅ |
triggeredByMaxBatchSizeCounter | Общее количество flush-операций, вызванных достижением maxBatchSize | Counter | ✅ |
triggeredByMaxBatchSizeInBytesCounter | Общее количество flush-операций, вызванных достижением maxBatchSizeInBytes | Counter | ✅ |
triggeredByMaxTimeInBufferMSCounter | Общее количество flush-операций, вызванных достижением maxTimeInBufferMS | Counter | ✅ |
actualRecordsPerBatch | Гистограмма распределения фактического размера батча | Histogram | ✅ |
actualBytesPerBatch | Гистограмма распределения фактического количества байтов в батче | Histogram | ✅ |
- В настоящее время sink гарантирует доставку как минимум один раз. Работа над семантикой «ровно один раз» отслеживается здесь.
- Sink пока не поддерживает dead-letter queue (DLQ) для буферизации записей, которые не удалось обработать. Пока что коннектор будет пытаться повторно выполнять вставку записей, завершившихся ошибкой, и отбрасывать их в случае неудачи. Эта возможность отслеживается здесь.
- Sink пока не поддерживает создание через Table API Flink или Flink SQL. Эта возможность отслеживается здесь.
Совместимость версий ClickHouse и безопасность
- Коннектор ежедневно тестируется в CI с рядом последних версий ClickHouse, включая latest и head. Список тестируемых версий периодически обновляется по мере выхода новых релизов ClickHouse. Здесь можно посмотреть, с какими версиями коннектор ежедневно проходит тестирование.
- Сведения об известных уязвимостях и о том, как сообщить о новой уязвимости, см. в политике безопасности ClickHouse.
- Мы рекомендуем регулярно обновлять коннектор, чтобы не пропускать исправления безопасности и новые улучшения.
- Если у вас возникли проблемы с миграцией, создайте issue в GitHub, и мы ответим!
Расширенное и рекомендуемое использование
- Для оптимальной производительности убедитесь, что тип элементов в вашем DataStream не является типом Generic — см. здесь описание различий между типами во Flink. Элементы, не относящиеся к Generic, позволяют избежать накладных расходов на serialization, связанных с Kryo, и повысить пропускную способность при записи в ClickHouse.
- Мы рекомендуем установить
maxBatchSize как минимум в 1000, а в идеале — в диапазоне от 10 000 до 100 000. Подробнее см. в этом руководстве по массовым вставкам.
- Чтобы выполнять дедупликацию в стиле OLTP или upsert в ClickHouse, обратитесь к этой странице документации. Примечание: не путайте это с дедупликацией батчей, которая происходит при повторных попытках.
Может возникнуть следующая ошибка:
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
Причина: Чаще всего ошибка CANNOT_READ_ALL_DATA означает, что схема вашей таблицы ClickHouse перестала совпадать со схемой записей Flink. Это может произойти, если одна из них была изменена способом, несовместимым с предыдущей версией.
Решение: Обновите схему таблицы ClickHouse, тип входных данных коннектора или и то и другое, чтобы они стали совместимыми. При необходимости обратитесь к разделу сопоставление типов, чтобы понять, как сопоставлять типы Java с типами ClickHouse. Примечание: если некоторые записи всё ещё находятся в обработке, при перезапуске коннектора потребуется сбросить состояние Flink.
Низкая пропускная способность
При записи в ClickHouse вы можете столкнуться с тем, что пропускная способность коннектора не масштабируется вместе с параллелизмом задачи (числом задач Flink).
Причина: фоновый процесс слияния частей в ClickHouse может замедлять вставки. Это может происходить, если задан слишком маленький размер батча, коннектор слишком часто выполняет flush, или из-за сочетания обоих факторов.
Решение: Отслеживайте метрики numRequestSubmitted и actualRecordsPerBatch, чтобы понять, как подобрать размер батча (maxBatchSize) и частоту flush. Рекомендации по выбору размера батча также приведены в разделе Расширенное и рекомендуемое использование.
В моей таблице ClickHouse отсутствуют строки
Причина: батч(и) были отброшены либо из-за сбоя, не допускающего повторной попытки, либо потому, что их не удалось вставить за настроенное число повторных попыток (задаётся через ClickHouseClientConfig.setNumberOfRetries()). Примечание: по умолчанию коннектор пытается повторно вставить батч до 3 раз, прежде чем отбросить его.
Решение: Проверьте журналы TaskManager и/или трассировки стека, чтобы определить первопричину.
Участие в проекте и поддержка
Если вы хотите внести свой вклад в проект или сообщить о проблемах, мы будем рады вашему участию!
Перейдите в наш репозиторий GitHub, чтобы создать issue, предложить
улучшения или отправить pull request.
Мы приветствуем вклад в проект! Перед началом работы, пожалуйста, ознакомьтесь с руководством по участию в репозитории.
Спасибо, что помогаете улучшать коннектор ClickHouse для Flink! Последнее изменение 10 июня 2026 г.