Apache Avro es un formato de serialización orientado a filas que utiliza codificación binaria para procesar datos de forma eficiente. El formato AvroConfluent admite leer y escribir mensajes codificados en Avro mediante Confluent Schema Registry (o servicios compatibles con su API).
Cada mensaje utiliza el wire format de Confluent: un byte mágico (0x00), seguido de un ID de esquema de 4 bytes en formato big-endian, seguido del dato binario de Avro. Durante la lectura, ClickHouse resuelve el ID de esquema consultando el registro. Durante la escritura, ClickHouse registra el esquema derivado de las columnas de salida y antepone el ID resultante a cada fila. Los esquemas se almacenan en caché para un rendimiento óptimo.
Correspondencia de tipos de datos
La siguiente tabla muestra todos los tipos de datos admitidos por el formato Apache Avro y sus correspondientes tipos de datos de ClickHouse en las consultas INSERT y SELECT.
Tipo de dato de Avro INSERT | Tipo de dato de ClickHouse | Tipo de dato de Avro SELECT |
|---|
boolean, int, long, float, double | Int(8\16\32), UInt(8\16\32) | int |
boolean, int, long, float, double | Int64, UInt64 | long |
boolean, int, long, float, double | Float32 | float |
boolean, int, long, float, double | Float64 | double |
bytes, string, fixed, enum | String | bytes o string * |
bytes, string, fixed | FixedString(N) | fixed(N) |
enum | Enum(8\16) | enum |
array(T) | Array(T) | array(T) |
map(V, K) | Map(V, K) | map(string, K) |
union(null, T), union(T, null) | Nullable(T) | union(null, T) |
union(T1, T2, …) ** | Variant(T1, T2, …) | union(T1, T2, …) ** |
null | Nullable(Nothing) | null |
int (date) *** | Date, Date32 | int (date) *** |
long (timestamp-millis) *** | DateTime64(3) | long (timestamp-millis) *** |
long (timestamp-micros) *** | DateTime64(6) | long (timestamp-micros) *** |
bytes (decimal) *** | DateTime64(N) | bytes (decimal) *** |
int | IPv4 | int |
fixed(16) | IPv6 | fixed(16) |
bytes (decimal) *** | Decimal(P, S) | bytes (decimal) *** |
string (uuid) *** | UUID | string (uuid) *** |
fixed(16) | Int128/UInt128 | fixed(16) |
fixed(32) | Int256/UInt256 | fixed(32) |
record | Tuple | record |
** El tipo Variant acepta implícitamente null como valor de un campo, por lo que, por ejemplo, union(T1, T2, null) de Avro se convertirá en Variant(T1, T2).
Como resultado, al generar Avro desde ClickHouse, siempre tenemos que incluir el tipo null en el conjunto de tipos de union de Avro, ya que durante la inferencia del esquema no sabemos si algún valor es realmente null.
*** Tipos lógicos de Avro
Tipos de datos lógicos de Avro no compatibles:
time-millis
time-micros
duration
| Configuración | Descripción | Predeterminado |
|---|
input_format_avro_allow_missing_fields | Si se debe usar un valor predeterminado en lugar de generar un error cuando no se encuentra un campo en el esquema. | 0 |
input_format_avro_null_as_default | Si se debe usar un valor predeterminado en lugar de generar un error al insertar un valor null en una columna que no admite NULL. | 0 |
format_avro_schema_registry_url | La URL de Confluent registro de esquemas. Para la autenticación básica, las credenciales codificadas en URL pueden incluirse directamente en la propia URL. | |
format_avro_schema_registry_connection_timeout | Tiempo de espera de conexión, en segundos, para el cliente HTTP de registro de esquemas (se usa tanto para la obtención del esquema como para su registro). Debe ser mayor que 0 y menor que 600 (10 minutos). | 1 |
format_avro_schema_registry_send_timeout | Tiempo de espera de envío, en segundos, para el cliente HTTP de registro de esquemas. Debe ser mayor que 0 y menor que 600 (10 minutos). | 1 |
format_avro_schema_registry_receive_timeout | Tiempo de espera de recepción, en segundos, para el cliente HTTP de registro de esquemas. Debe ser mayor que 0 y menor que 600 (10 minutos). | 1 |
output_format_avro_confluent_subject | Para la salida: el nombre del subject con el que el esquema está registrado en registro de esquemas. Obligatorio al escribir. | |
output_format_avro_string_column_pattern | Para la salida: expresión regular de las columnas String que se serializarán como Avro string (el valor predeterminado es bytes). | |
Para leer un topic de Kafka codificado en Avro mediante el motor de tabla Kafka, utilice la configuración format_avro_schema_registry_url para indicar la URL del registro de esquemas.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';
SELECT * FROM topic1_stream;
Para escribir mensajes de AvroConfluent en un topic de Kafka, configure tanto la URL del registro de esquemas como el nombre del subject. El esquema se registra automáticamente en el registro con la primera escritura.
CREATE TABLE topic1_sink
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';
INSERT INTO topic1_sink VALUES ('hello', 'world');
Uso de la autenticación básica
Si tu registro de esquemas requiere autenticación básica (p. ej., si usas Confluent Cloud), puedes proporcionar credenciales codificadas en URL en el parámetro format_avro_schema_registry_url.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
Para supervisar el progreso de la ingestión y depurar los errores del consumidor de Kafka, puedes consultar la tabla del sistema system.kafka_consumers. Si tu implementación tiene varias réplicas (p. ej., ClickHouse Cloud), debes usar la función de tabla clusterAllReplicas.
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
Si tiene problemas con la resolución del esquema, puede usar kafkacat con clickhouse-local para diagnosticar el problema:
$ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table'
1 a
2 b
3 c
Última modificación el 10 de junio de 2026