Перейти к основному содержанию
Это официальный sink-коннектор для Apache Flink, поддерживаемый ClickHouse. Он построен на основе AsyncSinkBase из Flink и официального Java-клиента ClickHouse. Коннектор поддерживает DataStream API Apache Flink. Поддержка Table API планируется в одном из будущих релизов.

Требования

  • Java 11+ (для Flink 1.17+) или 17+ (для Flink 2.0+)
  • Apache Flink 1.17+
Коннектор выпускается в виде двух артефактов для поддержки Flink 1.17+ и Flink 2.0+. Выберите артефакт, соответствующий нужной версии Flink:
Версия FlinkАртефактВерсия Java-клиента ClickHouseТребуемая версия Java
latestflink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+
Коннектор не тестировался с версиями Flink более ранними, чем 1.17.2

Установка и настройка

Импорт в качестве зависимости

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

Скачайте JAR-файл

Шаблон имени JAR-файла:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
где: Все доступные JAR-файлы опубликованных релизов можно найти в репозитории Maven Central.

Использование DataStream API

Фрагмент

Допустим, вы хотите выполнить вставку необработанных данных CSV в ClickHouse:
public static void main(String[] args) {
    // Настройте ClickHouseClient
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // Создайте ElementConverter
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // Создайте sink и задайте формат с помощью `setClickHouseFormat`
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // Наконец, подключите DataStream к sink.
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}
Другие примеры и фрагменты можно найти в наших тестах:

Пример быстрого старта

Мы подготовили пример на Maven, чтобы вы могли быстро начать работу с ClickHouse Sink: Более подробные инструкции см. в руководстве с примерами

Параметры подключения к DataStream API

Параметры клиента ClickHouse

ПараметрыОписаниеЗначение по умолчаниюОбязательно
urlПолный URL ClickHouseN/AДа
usernameИмя пользователя базы данных ClickHouseN/AДа
passwordПароль пользователя базы данных ClickHouseN/AДа
databaseИмя базы данных ClickHouseN/AДа
tableИмя таблицы ClickHouseN/AДа
optionsMap с параметрами конфигурации Java-клиентаПустой MapНет
serverSettingsMap с настройками сеанса сервера ClickHouseПустой MapНет
enableJsonSupportAsStringНастройка сервера ClickHouse, указывающая, что для типа данных JSON ожидается String в формате JSONtrueНет
options и serverSettings следует передавать клиенту как Map<String, String>. Если для любого из них передать пустой Map, будут использоваться значения клиента или сервера по умолчанию соответственно.
Все доступные параметры Java-клиента перечислены в ClientConfigProperties.java и в этой странице документации.Все доступные настройки сеанса сервера перечислены на этой странице документации.
Например:
Map<String, String> javaClientOptions = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

Map<String, String> serverSettings = Map.of(
    "insert_deduplicate", "1"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    javaClientOptions,
    serverSettings,
    false // enableJsonSupportAsString
);

Параметры sink

Следующие параметры взяты напрямую из AsyncSinkBase Flink:
ПараметрыОписаниеЗначение по умолчаниюОбязательно
maxBatchSizeМаксимальное количество записей, вставляемых за один батчN/AДа
maxInFlightRequestsМаксимальное количество запросов в обработке, допустимое до того, как sink начнет применять backpressureN/AДа
maxBufferedRequestsМаксимальное количество записей, которое может буферизоваться в sink до применения backpressureN/AДа
maxBatchSizeInBytesМаксимальный размер батча (в байтах). Все отправляемые батчи будут меньше или равны этому значениюN/AДа
maxTimeInBufferMSМаксимальное время, в течение которого запись может оставаться в sink перед отправкойN/AДа
maxRecordSizeInBytesМаксимальный размер записи, который принимает sink; записи большего размера будут автоматически отклонятьсяN/AДа

Поддерживаемые типы данных

В таблице ниже приведена краткая справка по преобразованию типов данных при вставке данных из Flink в ClickHouse.
Тип JavaТип ClickHouseПоддерживаетсяМетод сериализации
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTimeН/Д
long/LongTime64Н/Д
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariantН/Д
Примечания:
  • При выполнении операций над датами необходимо указать ZoneId.
  • При выполнении операций с десятичными числами необходимо указать точность и масштаб.
  • Чтобы ClickHouse мог разобрать строку Java как JSON, необходимо включить enableJsonSupportAsString в ClickHouseClientConfig.
  • Коннектору требуется ElementConvertor, чтобы сопоставлять элементы входного DataStream с полезными нагрузками ClickHouse. Для этого коннектор предоставляет ClickHouseConvertor и POJOConvertor, которые можно использовать для реализации такого сопоставления с помощью приведённых выше методов сериализации DataWriter.

Поддерживаемые входные форматы

Список доступных входных форматов ClickHouse можно найти на этой странице документации и в ClickHouseFormat.java. Чтобы указать формат, который коннектор должен использовать для сериализации вашего DataStream в полезные нагрузки для ClickHouse, используйте функцию setClickHouseFormat. Например:
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString,
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS,
        MAX_BUFFERED_REQUESTS,
        MAX_BATCH_SIZE_IN_BYTES,
        MAX_TIME_IN_BUFFER_MS,
        MAX_RECORD_SIZE_IN_BYTES,
        clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
По умолчанию коннектор использует RowBinaryWithDefaults или RowBinary, если параметр setSupportDefault в ClickHouseClientConfig явно задан как true или false соответственно.

Метрики

Коннектор предоставляет следующие дополнительные метрики помимо стандартных метрик Flink:
МетрикаОписаниеТипСтатус
numBytesSendОбщее количество байтов, отправленных в ClickHouse в полезной нагрузке запроса. Примечание: эта метрика измеряет размер сериализованных данных, переданных по сети, и может отличаться от written_bytes в system.query_log ClickHouse, который отражает фактическое количество байтов, записанных в хранилище после обработкиCounter
numRecordSendОбщее количество записей, отправленных в ClickHouseCounter
numRequestSubmittedОбщее количество отправленных запросов (фактическое количество выполненных flush-операций)Counter
numOfDroppedBatchesОбщее количество батчей, отброшенных из-за сбоев без возможности повторной попыткиCounter
numOfDroppedRecordsОбщее количество записей, отброшенных из-за сбоев без возможности повторной попыткиCounter
totalBatchRetriesОбщее количество повторных попыток батчей из-за сбоев, допускающих повторную попыткуCounter
writeLatencyHistogramГистограмма распределения задержки успешной записи (мс)Histogram
writeFailureLatencyHistogramГистограмма распределения задержки неуспешной записи (мс)Histogram
triggeredByMaxBatchSizeCounterОбщее количество flush-операций, вызванных достижением maxBatchSizeCounter
triggeredByMaxBatchSizeInBytesCounterОбщее количество flush-операций, вызванных достижением maxBatchSizeInBytesCounter
triggeredByMaxTimeInBufferMSCounterОбщее количество flush-операций, вызванных достижением maxTimeInBufferMSCounter
actualRecordsPerBatchГистограмма распределения фактического размера батчаHistogram
actualBytesPerBatchГистограмма распределения фактического количества байтов в батчеHistogram

Ограничения

  • В настоящее время sink гарантирует доставку как минимум один раз. Работа над семантикой «ровно один раз» отслеживается здесь.
  • Sink пока не поддерживает dead-letter queue (DLQ) для буферизации записей, которые не удалось обработать. Пока что коннектор будет пытаться повторно выполнять вставку записей, завершившихся ошибкой, и отбрасывать их в случае неудачи. Эта возможность отслеживается здесь.
  • Sink пока не поддерживает создание через Table API Flink или Flink SQL. Эта возможность отслеживается здесь.

Совместимость версий ClickHouse и безопасность

  • Коннектор ежедневно тестируется в CI с рядом последних версий ClickHouse, включая latest и head. Список тестируемых версий периодически обновляется по мере выхода новых релизов ClickHouse. Здесь можно посмотреть, с какими версиями коннектор ежедневно проходит тестирование.
  • Сведения об известных уязвимостях и о том, как сообщить о новой уязвимости, см. в политике безопасности ClickHouse.
  • Мы рекомендуем регулярно обновлять коннектор, чтобы не пропускать исправления безопасности и новые улучшения.
  • Если у вас возникли проблемы с миграцией, создайте issue в GitHub, и мы ответим!
  • Для оптимальной производительности убедитесь, что тип элементов в вашем DataStream не является типом Generic — см. здесь описание различий между типами во Flink. Элементы, не относящиеся к Generic, позволяют избежать накладных расходов на serialization, связанных с Kryo, и повысить пропускную способность при записи в ClickHouse.
  • Мы рекомендуем установить maxBatchSize как минимум в 1000, а в идеале — в диапазоне от 10 000 до 100 000. Подробнее см. в этом руководстве по массовым вставкам.
  • Чтобы выполнять дедупликацию в стиле OLTP или upsert в ClickHouse, обратитесь к этой странице документации. Примечание: не путайте это с дедупликацией батчей, которая происходит при повторных попытках.

Устранение неполадок

CANNOT_READ_ALL_DATA

Может возникнуть следующая ошибка:
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
Причина: Чаще всего ошибка CANNOT_READ_ALL_DATA означает, что схема вашей таблицы ClickHouse перестала совпадать со схемой записей Flink. Это может произойти, если одна из них была изменена способом, несовместимым с предыдущей версией. Решение: Обновите схему таблицы ClickHouse, тип входных данных коннектора или и то и другое, чтобы они стали совместимыми. При необходимости обратитесь к разделу сопоставление типов, чтобы понять, как сопоставлять типы Java с типами ClickHouse. Примечание: если некоторые записи всё ещё находятся в обработке, при перезапуске коннектора потребуется сбросить состояние Flink.

Низкая пропускная способность

При записи в ClickHouse вы можете столкнуться с тем, что пропускная способность коннектора не масштабируется вместе с параллелизмом задачи (числом задач Flink). Причина: фоновый процесс слияния частей в ClickHouse может замедлять вставки. Это может происходить, если задан слишком маленький размер батча, коннектор слишком часто выполняет flush, или из-за сочетания обоих факторов. Решение: Отслеживайте метрики numRequestSubmitted и actualRecordsPerBatch, чтобы понять, как подобрать размер батча (maxBatchSize) и частоту flush. Рекомендации по выбору размера батча также приведены в разделе Расширенное и рекомендуемое использование.

В моей таблице ClickHouse отсутствуют строки

Причина: батч(и) были отброшены либо из-за сбоя, не допускающего повторной попытки, либо потому, что их не удалось вставить за настроенное число повторных попыток (задаётся через ClickHouseClientConfig.setNumberOfRetries()). Примечание: по умолчанию коннектор пытается повторно вставить батч до 3 раз, прежде чем отбросить его. Решение: Проверьте журналы TaskManager и/или трассировки стека, чтобы определить первопричину.

Участие в проекте и поддержка

Если вы хотите внести свой вклад в проект или сообщить о проблемах, мы будем рады вашему участию! Перейдите в наш репозиторий GitHub, чтобы создать issue, предложить улучшения или отправить pull request. Мы приветствуем вклад в проект! Перед началом работы, пожалуйста, ознакомьтесь с руководством по участию в репозитории. Спасибо, что помогаете улучшать коннектор ClickHouse для Flink!
Последнее изменение 10 июня 2026 г.