Este es el conector sink oficial de Apache Flink compatible con ClickHouse. Está desarrollado con AsyncSinkBase de Flink y el cliente Java oficial de ClickHouse.
El conector es compatible con la API DataStream de Apache Flink. La compatibilidad con la API Table está prevista para una versión futura.
- Java 11+ (para Flink 1.17+) o 17+ (para Flink 2.0+)
- Apache Flink 1.17+
Matriz de compatibilidad de versiones de Flink
El conector se distribuye en dos artefactos para admitir tanto Flink 1.17+ como Flink 2.0+. Elige el artefacto que corresponda a la versión de Flink que quieras usar:
| Versión de Flink | Artefacto | Versión del cliente Java de ClickHouse | Java requerido |
|---|
| latest | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.1 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
El conector no se ha probado con versiones de Flink anteriores a la 1.17.2
Instalación y configuración
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-2.0.0:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-2.0.0" % {{ stable_version }} classifier "all"
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-1.17</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-1.17:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-1.17" % {{ stable_version }} classifier "all"
El patrón de nombre del archivo JAR binario es:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
donde:
Puedes encontrar todos los archivos JAR publicados disponibles en el Repositorio Central de Maven.
Uso de la API de DataStream
Supongamos que quieres insertar datos CSV sin procesar en ClickHouse:
public static void main(String[] args) {
// Configurar ClickHouseClient
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
// Crear un ElementConverter
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
// Crear el sink y establecer el formato con `setClickHouseFormat`
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
// Por último, conecta tu DataStream al sink.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Path csvFilePath = new Path(fileFullName);
FileSource<String> csvSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
.build();
env.fromSource(
csvSource,
WatermarkStrategy.noWatermarks(),
"GzipCsvSource"
).sinkTo(csvSink);
}
Puedes encontrar más ejemplos y fragmentos de código en nuestras pruebas:
Hemos creado un ejemplo basado en Maven para facilitar el inicio con el sink de ClickHouse:
Para obtener instrucciones más detalladas, consulta la Guía del ejemplo
Opciones de conexión de la API de DataStream
Opciones del cliente de ClickHouse
| Parameters | Description | Default Value | Required |
|---|
url | URL completa de ClickHouse | N/A | Sí |
username | Nombre de usuario de la base de datos de ClickHouse | N/A | Sí |
password | Contraseña de la base de datos de ClickHouse | N/A | Sí |
database | Nombre de la base de datos de ClickHouse | N/A | Sí |
table | Nombre de la tabla de ClickHouse | N/A | Sí |
options | Mapa de opciones de configuración del cliente Java | Mapa vacío | No |
serverSettings | Mapa de ajustes de sesión del servidor de ClickHouse | Mapa vacío | No |
enableJsonSupportAsString | Ajuste del servidor de ClickHouse para esperar un String con formato JSON para el tipo de dato JSON | true | No |
options y serverSettings deben pasarse al cliente como Map<String, String>. Si cualquiera de los dos es un mapa vacío, se usarán los valores predeterminados del cliente o del servidor, respectivamente.
Por ejemplo:
Map<String, String> javaClientOptions = Map.of(
ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);
Map<String, String> serverSettings = Map.of(
"insert_deduplicate", "1"
);
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
url,
username,
password,
database,
tableName,
javaClientOptions,
serverSettings,
false // enableJsonSupportAsString
);
Las siguientes opciones provienen directamente de AsyncSinkBase de Flink:
| Parámetros | Descripción | Valor predeterminado | Obligatorio |
|---|
maxBatchSize | Número máximo de registros insertados en un solo lote | N/A | Sí |
maxInFlightRequests | Número máximo de solicitudes en curso permitidas antes de que el sink aplique contrapresión | N/A | Sí |
maxBufferedRequests | Número máximo de registros que pueden almacenarse en el búfer del sink antes de que se aplique contrapresión | N/A | Sí |
maxBatchSizeInBytes | Tamaño máximo (en bytes) que puede alcanzar un lote. Todos los lotes enviados serán menores o iguales a este tamaño | N/A | Sí |
maxTimeInBufferMS | Tiempo máximo que un registro puede permanecer en el sink antes de enviarse | N/A | Sí |
maxRecordSizeInBytes | Tamaño máximo de registro que aceptará el sink; los registros que lo superen se rechazarán automáticamente | N/A | Sí |
Tipos de datos compatibles
La siguiente tabla ofrece una referencia rápida para la conversión de tipos de datos al insertar datos desde Flink en ClickHouse.
Inserción de datos de Flink en ClickHouse
| Tipo de Java | Tipo de ClickHouse | Admitido | Método de serialización |
|---|
byte/Byte | Int8 | ✅ | DataWriter.writeInt8 |
short/Short | Int16 | ✅ | DataWriter.writeInt16 |
int/Integer | Int32 | ✅ | DataWriter.writeInt32 |
long/Long | Int64 | ✅ | DataWriter.writeInt64 |
BigInteger | Int128 | ✅ | DataWriter.writeInt128 |
BigInteger | Int256 | ✅ | DataWriter.writeInt256 |
short/Short | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt16 | ✅ | DataWriter.writeUInt16 |
long/Long | UInt32 | ✅ | DataWriter.writeUInt32 |
long/Long | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt128 | ✅ | DataWriter.writeUInt128 |
BigInteger | UInt256 | ✅ | DataWriter.writeUInt256 |
BigDecimal | Decimal | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal32 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal64 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal128 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal256 | ✅ | DataWriter.writeDecimal |
float/Float | Float | ✅ | DataWriter.writeFloat32 |
double/Double | Double | ✅ | DataWriter.writeFloat64 |
boolean/Boolean | Boolean | ✅ | DataWriter.writeBoolean |
String | String | ✅ | DataWriter.writeString |
String | FixedString | ✅ | DataWriter.writeFixedString |
LocalDate | Date | ✅ | DataWriter.writeDate |
LocalDate | Date32 | ✅ | DataWriter.writeDate32 |
LocalDateTime | DateTime | ✅ | DataWriter.writeDateTime |
ZonedDateTime | DateTime | ✅ | DataWriter.writeDateTime |
LocalDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
ZonedDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
int/Integer | Time | ❌ | N/A |
long/Long | Time64 | ❌ | N/A |
byte/Byte | Enum8 | ✅ | DataWriter.writeInt8 |
int/Integer | Enum16 | ✅ | DataWriter.writeInt16 |
java.util.UUID | UUID | ✅ | DataWriter.writeIntUUID |
String | JSON | ✅ | DataWriter.writeJSON |
Array<Type> | Array<Type> | ✅ | DataWriter.writeArray |
Map<K,V> | Map<K,V> | ✅ | DataWriter.writeMap |
Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | DataWriter.writeTuple |
Object | Variant | ❌ | N/A |
Notas:
- Se debe proporcionar un
ZoneId al realizar operaciones con fechas.
- Se deben proporcionar la precisión y la escala al realizar operaciones decimales.
- Para que ClickHouse pueda interpretar un
String de Java como JSON, es necesario habilitar enableJsonSupportAsString en ClickHouseClientConfig.
- El conector requiere un
ElementConvertor para asignar los elementos del DataStream de entrada a los payloads de ClickHouse. Para ello, el conector proporciona ClickHouseConvertor y POJOConvertor, que pueden usarse para implementar esta asignación mediante los métodos de serialización de DataWriter indicados anteriormente.
Puedes consultar la lista de formatos de entrada de ClickHouse disponibles en esta página de documentación y en ClickHouseFormat.java.
Para especificar el formato que debe usar el conector para serializar tu DataStream como payloads de ClickHouse, utiliza la función setClickHouseFormat. Por ejemplo:
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
De forma predeterminada, el conector usará RowBinaryWithDefaults o RowBinary si setSupportDefault en ClickHouseClientConfig se establece explícitamente en true o false, respectivamente.
El conector expone las siguientes métricas adicionales, además de las que ya ofrece Flink:
| Métrica | Descripción | Tipo | Estado |
|---|
numBytesSend | Número total de bytes enviados a ClickHouse en el payload de la solicitud. Nota: esta métrica mide el tamaño de los datos serializados enviados a través de la red y puede diferir de written_bytes de ClickHouse en system.query_log, que refleja los bytes reales escritos en el almacenamiento después del procesamiento | Counter | ✅ |
numRecordSend | Número total de registros enviados a ClickHouse | Counter | ✅ |
numRequestSubmitted | Número total de solicitudes enviadas (número real de flushes realizados) | Counter | ✅ |
numOfDroppedBatches | Número total de lotes descartados debido a errores no reintentables | Counter | ✅ |
numOfDroppedRecords | Número total de registros descartados debido a errores no reintentables | Counter | ✅ |
totalBatchRetries | Número total de reintentos de lotes debido a errores reintentables | Counter | ✅ |
writeLatencyHistogram | Histograma de la distribución de la latencia de escritura correcta (ms) | Histogram | ✅ |
writeFailureLatencyHistogram | Histograma de la distribución de la latencia de escritura fallida (ms) | Histogram | ✅ |
triggeredByMaxBatchSizeCounter | Número total de flushes provocados al alcanzar maxBatchSize | Counter | ✅ |
triggeredByMaxBatchSizeInBytesCounter | Número total de flushes provocados al alcanzar maxBatchSizeInBytes | Counter | ✅ |
triggeredByMaxTimeInBufferMSCounter | Número total de flushes provocados al alcanzar maxTimeInBufferMS | Counter | ✅ |
actualRecordsPerBatch | Histograma de la distribución del tamaño real de los lotes | Histogram | ✅ |
actualBytesPerBatch | Histograma de la distribución del número real de bytes por lote | Histogram | ✅ |
- El sink actualmente ofrece una garantía de entrega de al menos una vez. El trabajo para lograr la semántica exactly-once se está rastreando aquí.
- El sink todavía no admite una cola de mensajes fallidos (DLQ) para el almacenamiento en búfer de registros que no se pueden procesar. Mientras tanto, el conector intentará volver a insertar los registros que fallen y los descartará si no lo consigue. Esta funcionalidad se está rastreando aquí.
- El sink todavía no admite la creación mediante la Table API de Flink ni con Flink SQL. Esta funcionalidad se está rastreando aquí.
Compatibilidad de versiones de ClickHouse y seguridad
- El conector se prueba a diario, mediante un flujo de trabajo de CI, con varias versiones recientes de ClickHouse, incluidas
latest y head. Las versiones probadas se actualizan periódicamente a medida que entran en uso nuevas versiones de ClickHouse. Consulta aquí las versiones con las que se prueba el conector cada día.
- Consulta la política de seguridad de ClickHouse para conocer las vulnerabilidades de seguridad conocidas y cómo informar de una vulnerabilidad.
- Recomendamos actualizar el conector de forma continua para no perder ninguna corrección de seguridad ni mejora nueva.
- Si tienes algún problema con la migración, crea un issue en GitHub y te responderemos.
Uso avanzado y recomendado
- Para obtener un rendimiento óptimo, asegúrese de que el tipo de elemento de su DataStream no sea un tipo genérico; consulte aquí la distinción de tipos de Flink. Los elementos no genéricos evitan la sobrecarga de serialización que introduce Kryo y mejoran el rendimiento hacia ClickHouse.
- Recomendamos configurar
maxBatchSize con un valor mínimo de 1000 y, preferiblemente, entre 10,000 y 100,000. Consulte esta guía sobre inserciones masivas para obtener más información.
- Para realizar deduplicación de estilo OLTP o upsert en ClickHouse, consulte esta página de documentación. Nota: no debe confundirse con la deduplicación por lotes que se produce en los reintentos.
Puede aparecer el siguiente error:
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
Causa: Lo más habitual es que el error CANNOT_READ_ALL_DATA signifique que el esquema de tu tabla de ClickHouse ha dejado de coincidir con el esquema de tus registros de Flink. Esto puede ocurrir cuando uno de los dos se modifica de una forma no compatible con versiones anteriores.
Solución: Actualiza el esquema de tu tabla de ClickHouse o el tipo de datos de entrada del conector (o ambos) para que sean compatibles. Si es necesario, consulta la correspondencia de tipos para ver cómo se asignan los tipos de Java a los tipos de ClickHouse. Nota: si todavía hay registros en tránsito, tendrás que restablecer el estado de Flink al reiniciar el conector.
Es posible que el rendimiento del conector no escale con el paralelismo del job (número de tareas de Flink) al escribir en ClickHouse.
Causa: El proceso de fusión de partes en segundo plano de ClickHouse puede estar ralentizando las inserciones. Esto puede ocurrir cuando el tamaño del lote configurado es demasiado pequeño, el conector hace flush con demasiada frecuencia, o por una combinación de ambos factores.
Solución: Supervise las métricas numRequestSubmitted y actualRecordsPerBatch para determinar cómo ajustar el tamaño del lote (maxBatchSize) y con qué frecuencia hacer flush. Consulte también Uso avanzado y recomendado para ver recomendaciones sobre el tamaño de los lotes.
Me faltan filas en mi tabla de ClickHouse
Causa: Los lotes se descartaron, ya sea por un error no reintentable o porque no pudieron insertarse dentro del número de reintentos configurado (se puede configurar mediante ClickHouseClientConfig.setNumberOfRetries()). Nota: de forma predeterminada, el conector intentará volver a insertar un lote hasta 3 veces antes de descartarlo.
Solución: Inspeccione los logs de TaskManager y/o las trazas de pila para identificar la causa raíz.
Si deseas contribuir al proyecto o informar de algún problema, ¡agradecemos tus aportaciones!
Visita nuestro repositorio de GitHub para abrir una issue, proponer
mejoras o enviar un pull request.
¡Las contribuciones son bienvenidas! Consulta la guía de contribución del repositorio antes de empezar.
¡Gracias por ayudarnos a mejorar el conector de ClickHouse para Flink!