Saltar al contenido principal
Este es el conector sink oficial de Apache Flink compatible con ClickHouse. Está desarrollado con AsyncSinkBase de Flink y el cliente Java oficial de ClickHouse. El conector es compatible con la API DataStream de Apache Flink. La compatibilidad con la API Table está prevista para una versión futura.

Requisitos

  • Java 11+ (para Flink 1.17+) o 17+ (para Flink 2.0+)
  • Apache Flink 1.17+
El conector se distribuye en dos artefactos para admitir tanto Flink 1.17+ como Flink 2.0+. Elige el artefacto que corresponda a la versión de Flink que quieras usar:
Versión de FlinkArtefactoVersión del cliente Java de ClickHouseJava requerido
latestflink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+
El conector no se ha probado con versiones de Flink anteriores a la 1.17.2

Instalación y configuración

Añadir como dependencia

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

Descarga el binario

El patrón de nombre del archivo JAR binario es:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
donde: Puedes encontrar todos los archivos JAR publicados disponibles en el Repositorio Central de Maven.

Uso de la API de DataStream

Snippet

Supongamos que quieres insertar datos CSV sin procesar en ClickHouse:
public static void main(String[] args) {
    // Configurar ClickHouseClient
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // Crear un ElementConverter
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // Crear el sink y establecer el formato con `setClickHouseFormat`
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // Por último, conecta tu DataStream al sink.
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}
Puedes encontrar más ejemplos y fragmentos de código en nuestras pruebas:

Ejemplo de inicio rápido

Hemos creado un ejemplo basado en Maven para facilitar el inicio con el sink de ClickHouse: Para obtener instrucciones más detalladas, consulta la Guía del ejemplo

Opciones de conexión de la API de DataStream

Opciones del cliente de ClickHouse

ParametersDescriptionDefault ValueRequired
urlURL completa de ClickHouseN/A
usernameNombre de usuario de la base de datos de ClickHouseN/A
passwordContraseña de la base de datos de ClickHouseN/A
databaseNombre de la base de datos de ClickHouseN/A
tableNombre de la tabla de ClickHouseN/A
optionsMapa de opciones de configuración del cliente JavaMapa vacíoNo
serverSettingsMapa de ajustes de sesión del servidor de ClickHouseMapa vacíoNo
enableJsonSupportAsStringAjuste del servidor de ClickHouse para esperar un String con formato JSON para el tipo de dato JSONtrueNo
options y serverSettings deben pasarse al cliente como Map<String, String>. Si cualquiera de los dos es un mapa vacío, se usarán los valores predeterminados del cliente o del servidor, respectivamente.
Todas las opciones disponibles del cliente Java se enumeran en ClientConfigProperties.java y en esta página de documentación.Todos los ajustes de sesión disponibles del servidor se enumeran en esta página de documentación.
Por ejemplo:
Map<String, String> javaClientOptions = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

Map<String, String> serverSettings = Map.of(
    "insert_deduplicate", "1"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    javaClientOptions,
    serverSettings,
    false // enableJsonSupportAsString
);

Opciones del sink

Las siguientes opciones provienen directamente de AsyncSinkBase de Flink:
ParámetrosDescripciónValor predeterminadoObligatorio
maxBatchSizeNúmero máximo de registros insertados en un solo loteN/A
maxInFlightRequestsNúmero máximo de solicitudes en curso permitidas antes de que el sink aplique contrapresiónN/A
maxBufferedRequestsNúmero máximo de registros que pueden almacenarse en el búfer del sink antes de que se aplique contrapresiónN/A
maxBatchSizeInBytesTamaño máximo (en bytes) que puede alcanzar un lote. Todos los lotes enviados serán menores o iguales a este tamañoN/A
maxTimeInBufferMSTiempo máximo que un registro puede permanecer en el sink antes de enviarseN/A
maxRecordSizeInBytesTamaño máximo de registro que aceptará el sink; los registros que lo superen se rechazarán automáticamenteN/A

Tipos de datos compatibles

La siguiente tabla ofrece una referencia rápida para la conversión de tipos de datos al insertar datos desde Flink en ClickHouse.
Tipo de JavaTipo de ClickHouseAdmitidoMétodo de serialización
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTimeN/A
long/LongTime64N/A
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariantN/A
Notas:
  • Se debe proporcionar un ZoneId al realizar operaciones con fechas.
  • Se deben proporcionar la precisión y la escala al realizar operaciones decimales.
  • Para que ClickHouse pueda interpretar un String de Java como JSON, es necesario habilitar enableJsonSupportAsString en ClickHouseClientConfig.
  • El conector requiere un ElementConvertor para asignar los elementos del DataStream de entrada a los payloads de ClickHouse. Para ello, el conector proporciona ClickHouseConvertor y POJOConvertor, que pueden usarse para implementar esta asignación mediante los métodos de serialización de DataWriter indicados anteriormente.

Formatos de entrada compatibles

Puedes consultar la lista de formatos de entrada de ClickHouse disponibles en esta página de documentación y en ClickHouseFormat.java. Para especificar el formato que debe usar el conector para serializar tu DataStream como payloads de ClickHouse, utiliza la función setClickHouseFormat. Por ejemplo:
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString,
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS,
        MAX_BUFFERED_REQUESTS,
        MAX_BATCH_SIZE_IN_BYTES,
        MAX_TIME_IN_BUFFER_MS,
        MAX_RECORD_SIZE_IN_BYTES,
        clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
De forma predeterminada, el conector usará RowBinaryWithDefaults o RowBinary si setSupportDefault en ClickHouseClientConfig se establece explícitamente en true o false, respectivamente.

Métricas

El conector expone las siguientes métricas adicionales, además de las que ya ofrece Flink:
MétricaDescripciónTipoEstado
numBytesSendNúmero total de bytes enviados a ClickHouse en el payload de la solicitud. Nota: esta métrica mide el tamaño de los datos serializados enviados a través de la red y puede diferir de written_bytes de ClickHouse en system.query_log, que refleja los bytes reales escritos en el almacenamiento después del procesamientoCounter
numRecordSendNúmero total de registros enviados a ClickHouseCounter
numRequestSubmittedNúmero total de solicitudes enviadas (número real de flushes realizados)Counter
numOfDroppedBatchesNúmero total de lotes descartados debido a errores no reintentablesCounter
numOfDroppedRecordsNúmero total de registros descartados debido a errores no reintentablesCounter
totalBatchRetriesNúmero total de reintentos de lotes debido a errores reintentablesCounter
writeLatencyHistogramHistograma de la distribución de la latencia de escritura correcta (ms)Histogram
writeFailureLatencyHistogramHistograma de la distribución de la latencia de escritura fallida (ms)Histogram
triggeredByMaxBatchSizeCounterNúmero total de flushes provocados al alcanzar maxBatchSizeCounter
triggeredByMaxBatchSizeInBytesCounterNúmero total de flushes provocados al alcanzar maxBatchSizeInBytesCounter
triggeredByMaxTimeInBufferMSCounterNúmero total de flushes provocados al alcanzar maxTimeInBufferMSCounter
actualRecordsPerBatchHistograma de la distribución del tamaño real de los lotesHistogram
actualBytesPerBatchHistograma de la distribución del número real de bytes por loteHistogram

Limitaciones

  • El sink actualmente ofrece una garantía de entrega de al menos una vez. El trabajo para lograr la semántica exactly-once se está rastreando aquí.
  • El sink todavía no admite una cola de mensajes fallidos (DLQ) para el almacenamiento en búfer de registros que no se pueden procesar. Mientras tanto, el conector intentará volver a insertar los registros que fallen y los descartará si no lo consigue. Esta funcionalidad se está rastreando aquí.
  • El sink todavía no admite la creación mediante la Table API de Flink ni con Flink SQL. Esta funcionalidad se está rastreando aquí.

Compatibilidad de versiones de ClickHouse y seguridad

  • El conector se prueba a diario, mediante un flujo de trabajo de CI, con varias versiones recientes de ClickHouse, incluidas latest y head. Las versiones probadas se actualizan periódicamente a medida que entran en uso nuevas versiones de ClickHouse. Consulta aquí las versiones con las que se prueba el conector cada día.
  • Consulta la política de seguridad de ClickHouse para conocer las vulnerabilidades de seguridad conocidas y cómo informar de una vulnerabilidad.
  • Recomendamos actualizar el conector de forma continua para no perder ninguna corrección de seguridad ni mejora nueva.
  • Si tienes algún problema con la migración, crea un issue en GitHub y te responderemos.
  • Para obtener un rendimiento óptimo, asegúrese de que el tipo de elemento de su DataStream no sea un tipo genérico; consulte aquí la distinción de tipos de Flink. Los elementos no genéricos evitan la sobrecarga de serialización que introduce Kryo y mejoran el rendimiento hacia ClickHouse.
  • Recomendamos configurar maxBatchSize con un valor mínimo de 1000 y, preferiblemente, entre 10,000 y 100,000. Consulte esta guía sobre inserciones masivas para obtener más información.
  • Para realizar deduplicación de estilo OLTP o upsert en ClickHouse, consulte esta página de documentación. Nota: no debe confundirse con la deduplicación por lotes que se produce en los reintentos.

Solución de problemas

CANNOT_READ_ALL_DATA

Puede aparecer el siguiente error:
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
Causa: Lo más habitual es que el error CANNOT_READ_ALL_DATA signifique que el esquema de tu tabla de ClickHouse ha dejado de coincidir con el esquema de tus registros de Flink. Esto puede ocurrir cuando uno de los dos se modifica de una forma no compatible con versiones anteriores. Solución: Actualiza el esquema de tu tabla de ClickHouse o el tipo de datos de entrada del conector (o ambos) para que sean compatibles. Si es necesario, consulta la correspondencia de tipos para ver cómo se asignan los tipos de Java a los tipos de ClickHouse. Nota: si todavía hay registros en tránsito, tendrás que restablecer el estado de Flink al reiniciar el conector.

Bajo rendimiento

Es posible que el rendimiento del conector no escale con el paralelismo del job (número de tareas de Flink) al escribir en ClickHouse. Causa: El proceso de fusión de partes en segundo plano de ClickHouse puede estar ralentizando las inserciones. Esto puede ocurrir cuando el tamaño del lote configurado es demasiado pequeño, el conector hace flush con demasiada frecuencia, o por una combinación de ambos factores. Solución: Supervise las métricas numRequestSubmitted y actualRecordsPerBatch para determinar cómo ajustar el tamaño del lote (maxBatchSize) y con qué frecuencia hacer flush. Consulte también Uso avanzado y recomendado para ver recomendaciones sobre el tamaño de los lotes.

Me faltan filas en mi tabla de ClickHouse

Causa: Los lotes se descartaron, ya sea por un error no reintentable o porque no pudieron insertarse dentro del número de reintentos configurado (se puede configurar mediante ClickHouseClientConfig.setNumberOfRetries()). Nota: de forma predeterminada, el conector intentará volver a insertar un lote hasta 3 veces antes de descartarlo. Solución: Inspeccione los logs de TaskManager y/o las trazas de pila para identificar la causa raíz.

Contribuciones y soporte

Si deseas contribuir al proyecto o informar de algún problema, ¡agradecemos tus aportaciones! Visita nuestro repositorio de GitHub para abrir una issue, proponer mejoras o enviar un pull request. ¡Las contribuciones son bienvenidas! Consulta la guía de contribución del repositorio antes de empezar. ¡Gracias por ayudarnos a mejorar el conector de ClickHouse para Flink!
Última modificación el 10 de junio de 2026