Перейти к основному содержанию
Если вы используете ClickHouse Cloud, рекомендуем вместо этого ClickPipes. ClickPipes нативно поддерживает подключения к частной сети, независимое масштабирование ингестии и ресурсов кластера, а также комплексный мониторинг при стриминге данных из Kafka в ClickHouse.
  • Публикуйте потоки данных или подписывайтесь на них.
  • Организуйте отказоустойчивое хранилище.
  • Обрабатывайте потоки по мере их поступления.

Создание таблицы

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [ALIAS expr1],
    name2 [type2] [ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_security_protocol = '',]
    [kafka_sasl_mechanism = '',]
    [kafka_sasl_username = '',]
    [kafka_sasl_password = '',]
    [kafka_autodetect_client_rack = '',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_consumer_reschedule_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_consumer_acquire_timeout_ms = 30000,]
    [kafka_max_rows_per_message = 1,]
    [kafka_compression_codec = '',]
    [kafka_compression_level = -1];
Обязательные параметры:
  • kafka_broker_list — Список брокеров, разделённых запятыми (например, localhost:9092).
  • kafka_topic_list — Список топиков Kafka.
  • kafka_group_name — Группа потребителей Kafka. Смещения чтения отслеживаются отдельно для каждой группы. Если вы не хотите, чтобы сообщения дублировались в кластере, используйте везде одно и то же имя группы.
  • kafka_format — Формат сообщений. Используется та же нотация, что и для SQL-функции FORMAT, например JSONEachRow. Дополнительные сведения см. в разделе Форматы.
Необязательные параметры:
  • kafka_security_protocol — протокол, используемый для связи с брокерами. Возможные значения: plaintext, ssl, sasl_plaintext, sasl_ssl.
  • kafka_sasl_mechanism - механизм SASL, используемый для аутентификации. Возможные значения: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER.
  • kafka_sasl_username - имя пользователя для SASL-аутентификации при использовании механизмов PLAIN и SASL-SCRAM-...
  • kafka_sasl_password — пароль SASL для механизмов PLAIN и SASL-SCRAM-...
  • kafka_schema — параметр, который необходимо использовать, если для формата требуется определение схемы. Например, Cap’n Proto требует указать путь к файлу схемы и имя корневого объекта schema.capnp:Message.
  • kafka_schema_registry_skip_bytes — Количество байтов, которое нужно пропустить в начале каждого сообщения при использовании schema registry с заголовками обёртки (например, AWS Glue Schema Registry, который добавляет 19-байтную обёртку). Диапазон: [0, 255]. По умолчанию: 0.
  • kafka_num_consumers — Количество consumers на таблицу. Укажите больше consumers, если пропускной способности одного consumer недостаточно. Общее количество consumers не должно превышать количество партиций в topic, поскольку на одну партицию может быть назначен только один consumer, и не должно быть больше количества физических ядер на сервере, где развернут ClickHouse. По умолчанию: 1.
  • kafka_max_block_size — Максимальный размер батча (в сообщениях) при выполнении poll. По умолчанию: max_insert_block_size.
  • kafka_skip_broken_messages — допустимое для парсера сообщений Kafka число несовместимых со схемой сообщений на блок. Если kafka_skip_broken_messages = N, то движок пропускает N сообщений Kafka, которые не удаётся разобрать (одно сообщение соответствует одной строке данных). Значение по умолчанию: 0.
  • kafka_commit_every_batch — Выполнять коммит для каждого потреблённого и обработанного батча, а не один коммит после записи всего блока. По умолчанию: 0.
  • kafka_client_id — идентификатор клиента. По умолчанию пустой.
  • kafka_poll_timeout_ms — тайм-аут для одного опроса из Kafka. По умолчанию: stream_poll_timeout_ms.
  • kafka_poll_max_batch_size — Максимальное количество сообщений, получаемых за один опрос Kafka. По умолчанию: max_block_size.
  • kafka_flush_interval_ms — Тайм-аут для сброса данных из Kafka. По умолчанию: stream_flush_interval_ms.
  • kafka_consumer_reschedule_ms — интервал перепланирования при остановке потоковой обработки Kafka (например, когда нет доступных для чтения сообщений). Эта настройка определяет задержку перед тем, как consumer повторит опрос. Не должно превышать kafka_consumers_pool_ttl_ms. По умолчанию: 500 миллисекунд.
  • kafka_thread_per_consumer — Выделяет отдельный поток для каждого консьюмера. Если включен, каждый консьюмер независимо сбрасывает данные, параллельно (в противном случае строки от нескольких консьюмеров объединяются в один блок). По умолчанию: 0.
  • kafka_handle_error_mode — как обрабатывать ошибки в движке Kafka. Возможные значения: default (если не удастся разобрать сообщение, будет сгенерировано исключение), stream (сообщение об исключении и исходное сообщение будут сохранены в виртуальных столбцах _error и _raw_message), dead_letter_queue (данные, связанные с ошибкой, будут сохранены в system.dead_letter_queue).
  • kafka_commit_on_select — Выполнять коммит сообщений при выполнении запроса SELECT. Значение по умолчанию: false.
  • kafka_consumer_acquire_timeout_ms — тайм-аут в миллисекундах на получение потребителя Kafka при выполнении прямых SELECT-запросов к таблице Kafka2 (с хранением смещений в Keeper). Когда к одной и той же таблице одновременно выполняются несколько прямых SELECT-запросов, каждый из них должен ждать, пока потребители не станут доступны. Тайм-аут предотвращает взаимные блокировки в случаях, когда запросы удерживают разные подмножества потребителей. Значение по умолчанию: 30000.
  • kafka_max_rows_per_message — Максимальное количество строк, записываемых в одно сообщение Kafka для построчных форматов. По умолчанию: 1.
  • kafka_autodetect_client_rack — автоматически задаёт параметр client.rack для librdkafka, чтобы отдавать предпочтение ближайшим репликам Kafka. Поддерживаемые источники: AWS_ZONE_ID — идентификатор зоны доступности AWS IMDSv2, например euc1-az1; AWS_ZONE_NAME — имя зоны доступности AWS IMDSv2, например eu-central-1a; GCP_ZONE — зона сервиса метаданных GCP, например europe-central2-a; CLICKHOUSE — использовать внутреннее определение ClickHouse, которое может опираться на метаданные облака или конфигурацию; AWS_ZONE_NAME_THEN_GCP_ZONE — сначала попытаться использовать AWS_ZONE_NAME, а затем GCP_ZONE. По умолчанию: пустая строка, отключено. Совет: в разных средах используются разные форматы зон доступности. Amazon MSK обычно использует идентификаторы зон, поэтому предпочтителен AWS_ZONE_ID. Confluent Cloud обычно использует имена зон, поэтому предпочтителен AWS_ZONE_NAME. Если вы не уверены, используйте AWS_ZONE_NAME_THEN_GCP_ZONE или проверьте значение broker.rack в вашем кластере. Примечание: брокеры Kafka должны быть настроены с broker.rack и replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector.
  • kafka_compression_codec — кодек сжатия, используемый при отправке сообщений. Поддерживаются: пустая строка, none, gzip, snappy, lz4, zstd. Если указана пустая строка, кодек сжатия таблицей не задаётся, поэтому будут использоваться значения из файлов конфигурации или значение по умолчанию из librdkafka. По умолчанию: пустая строка.
  • kafka_compression_level — параметр уровня сжатия для алгоритма, выбранного с помощью kafka_compression_codec. Более высокие значения обеспечивают лучшее сжатие, но требуют больше ресурсов CPU. Допустимый диапазон зависит от алгоритма: [0-9] для gzip; [0-12] для lz4; только 0 для snappy; [0-12] для zstd; -1 = уровень сжатия по умолчанию, зависящий от кодека. Значение по умолчанию: -1.
  • kafka_map_virtual_columns_on_write — Если включено, столбцы со специальными именами _key, _timestamp, _headers.name и _headers.value в схеме таблицы сопоставляются с соответствующими метаданными сообщений Kafka при INSERT и исключаются из полезной нагрузки сообщения. См. Сопоставление столбцов с метаданными сообщений Kafka. По умолчанию: false.
Примеры:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue3 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
Движок таблицы Kafka не поддерживает столбцы со значением по умолчанию. Если вам нужны столбцы со значением по умолчанию, их можно добавить в materialized view (см. ниже).

Описание

Доставленные сообщения отслеживаются автоматически, поэтому каждое сообщение в группе учитывается только один раз. Если вы хотите получить данные дважды, создайте копию таблицы с другим именем группы. Группы гибко настраиваются и синхронизируются в кластере. Например, если у вас есть 10 топиков и 5 копий таблицы в кластере, то каждая копия получает по 2 топика. Если количество копий меняется, топики автоматически перераспределяются между копиями. Подробнее об этом читайте на http://kafka.apache.org/intro. Рекомендуется, чтобы у каждого топика Kafka была собственная группа потребителей, обеспечивающая эксклюзивное соответствие между топиком и группой, особенно в средах, где топики могут динамически создаваться и удаляться (например, при тестировании или в промежуточном хранилище). SELECT не особенно полезен для чтения сообщений (кроме отладки), потому что каждое сообщение можно прочитать только один раз. Гораздо практичнее создавать потоки в реальном времени с помощью materialized view. Для этого:
  1. Используйте движок, чтобы создать consumer Kafka, и рассматривайте его как поток данных.
  2. Создайте таблицу с нужной структурой.
  3. Создайте materialized view, которое преобразует данные из движка и помещает их в ранее созданную таблицу.
Когда MATERIALIZED VIEW подключается к движку, оно начинает собирать данные в фоновом режиме. Это позволяет непрерывно получать сообщения из Kafka и преобразовывать их в нужный формат с помощью SELECT. У одной таблицы Kafka может быть сколько угодно materialized view; они не читают данные из таблицы Kafka напрямую, а получают новые записи (блоками), благодаря чему можно записывать данные в несколько таблиц с разным уровнем детализации (с группировкой — агрегацией — и без нее). Пример:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() AS total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;
Чтобы повысить производительность, полученные сообщения группируются в блоки размером max_insert_block_size. Если блок не был сформирован в течение stream_flush_interval_ms миллисекунд, данные будут записаны в таблицу независимо от полноты блока. Чтобы прекратить получение данных из топика или изменить логику преобразования, отсоедините materialized view:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
Если вы хотите изменить целевую таблицу с помощью ALTER, мы рекомендуем отключить materialized view, чтобы избежать расхождений между целевой таблицей и данными, поступающими из представления.

Конфигурация

Как и GraphiteMergeTree, движок Kafka поддерживает расширенную настройку через файл конфигурации ClickHouse. Можно использовать два ключа конфигурации: глобальный (внутри <kafka>) и на уровне topic (внутри <kafka><kafka_topic>). Сначала применяется глобальная конфигурация, затем — конфигурация на уровне topic (если она задана).
  <kafka>
    <!-- Глобальные параметры конфигурации для всех таблиц типа движка Kafka -->
    <debug>cgrp</debug>
    <statistics_interval_ms>3000</statistics_interval_ms>

    <kafka_topic>
        <name>logs</name>
        <statistics_interval_ms>4000</statistics_interval_ms>
    </kafka_topic>

    <!-- Настройки для consumer -->
    <consumer>
        <auto_offset_reset>smallest</auto_offset_reset>
        <kafka_topic>
            <name>logs</name>
            <fetch_min_bytes>100000</fetch_min_bytes>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <fetch_min_bytes>50000</fetch_min_bytes>
        </kafka_topic>
    </consumer>

    <!-- Настройки для producer -->
    <producer>
        <kafka_topic>
            <name>logs</name>
            <retry_backoff_ms>250</retry_backoff_ms>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <retry_backoff_ms>400</retry_backoff_ms>
        </kafka_topic>
    </producer>
  </kafka>
Список возможных параметров конфигурации см. в справочнике по конфигурации librdkafka. В конфигурации ClickHouse используйте символ подчёркивания (_) вместо точки. Например, check.crcs=true записывается как <check_crcs>true</check_crcs>.

Поддержка Kerberos

Для работы с Kafka с поддержкой Kerberos добавьте дочерний элемент security_protocol со значением sasl_plaintext. Этого достаточно, если средствами ОС получен и кэширован Kerberos ticket-granting ticket. ClickHouse может использовать учетные данные Kerberos с помощью файла keytab. Используйте дочерние элементы sasl_kerberos_service_name, sasl_kerberos_keytab и sasl_kerberos_principal. Пример:
<!-- Kafka с поддержкой Kerberos -->
<kafka>
  <security_protocol>SASL_PLAINTEXT</security_protocol>
  <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
  <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>

Виртуальные столбцы

  • _topic — топик Kafka. Тип данных: LowCardinality(String).
  • _key — ключ сообщения. Тип данных: String.
  • _offset — смещение сообщения. Тип данных: UInt64.
  • _timestamp — временная метка сообщения. Тип данных: Nullable(DateTime).
  • _timestamp_ms — временная метка сообщения в миллисекундах. Тип данных: Nullable(DateTime64(3)).
  • _partition — партиция топика Kafka. Тип данных: UInt64.
  • _headers.name — массив ключей заголовков сообщения. Тип данных: Array(String).
  • _headers.value — массив значений заголовков сообщения. Тип данных: Array(String).
Дополнительные виртуальные столбцы, когда kafka_handle_error_mode='stream':
  • _raw_message - необработанное сообщение, которое не удалось успешно разобрать. Тип данных: String.
  • _error - сообщение об исключении, возникшем при неудачном разборе. Тип данных: String.
Примечание: виртуальные столбцы _raw_message и _error заполняются только при возникновении исключения во время разбора; если сообщение было успешно разобрано, они всегда пусты.

Сопоставление столбцов с метаданными сообщений Kafka

При отправке сообщений через INSERT INTO движок Kafka всегда использует столбец с именем _key (типа String) в качестве ключа сообщения Kafka и столбец с именем _timestamp (типа DateTime) в качестве временной метки сообщения Kafka — если такие столбцы есть в таблице. По умолчанию эти столбцы также включаются в полезную нагрузку отправляемого сообщения наряду с остальными столбцами. При kafka_map_virtual_columns_on_write = 1 поведение меняется:
  • _key (тип String) — сопоставляется с ключом сообщения Kafka.
  • _timestamp (тип DateTime) — сопоставляется с временной меткой сообщения Kafka.
  • _headers.name (тип Array(String)) и _headers.value (тип Array(String)) — сопоставляются с заголовками сообщений Kafka. Каждая пара (_headers.name[i], _headers.value[i]) становится одним заголовком Kafka. Поскольку _headers.name и _headers.value имеют общий вложенный префикс _headers, ClickHouse требует, чтобы оба массива имели одинаковый размер в каждой строке.
Столбцы с такими именами исключаются из полезной нагрузки сообщения только в том случае, если их типы соответствуют перечисленным выше; в противном случае они остаются в полезной нагрузке, поэтому схемы, в которых эти имена используются повторно для несвязанных данных, продолжают работать. Пример:
CREATE TABLE kafka_out
(
    event_json String,
    `_key` String,
    `_timestamp` DateTime,
    `_headers.name` Array(String),
    `_headers.value` Array(String)
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'broker:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'events-producer',
    kafka_format = 'JSONEachRow',
    kafka_map_virtual_columns_on_write = 1;

INSERT INTO kafka_out VALUES
    ('{"a":1}', 'session-42', now(), ['source', 'trace_id'], ['api', 'abc-123']);
Сформированное сообщение Kafka содержит полезную нагрузку {"event_json":"{\"a\":1}"}, ключ session-42, текущую временную метку и два заголовка source=api и trace_id=abc-123.

Поддержка форматов данных

Движок Kafka поддерживает все форматы, доступные в ClickHouse. Количество строк в одном сообщении Kafka зависит от того, является ли формат построчным или блочным:
  • Для построчных форматов количество строк в одном сообщении Kafka можно задать с помощью настройки kafka_max_rows_per_message.
  • Для блочных форматов блок нельзя разбить на более мелкие части, но количество строк в одном блоке можно задать с помощью общей настройки max_block_size.

Движок для хранения зафиксированных смещений в ClickHouse Keeper

Если включен параметр allow_experimental_kafka_offsets_storage_in_keeper, то для движка таблицы Kafka можно задать еще две настройки:
  • kafka_keeper_path задает путь к таблице в ClickHouse Keeper
  • kafka_replica_name задает имя реплики в ClickHouse Keeper
Должны быть заданы либо обе настройки, либо ни одна из них. Если заданы обе, будет использоваться новый экспериментальный движок Kafka. Этот новый движок не зависит от хранения зафиксированных смещений в Kafka, а хранит их в ClickHouse Keeper. Он по-прежнему пытается коммитить смещения в Kafka, но опирается на них только при создании таблицы. Во всех остальных случаях (при перезапуске таблицы или восстановлении после какой-либо ошибки) для продолжения чтения сообщений будут использоваться смещения, сохраненные в ClickHouse Keeper. Помимо зафиксированного смещения, он также хранит количество сообщений, прочитанных в последнем батче, поэтому, если вставка завершится ошибкой, будет снова прочитано то же количество сообщений, что при необходимости позволяет выполнить дедупликацию. Пример:
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
  kafka_keeper_path = '/clickhouse/{database}/{uuid}',
  kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;

Известные ограничения

Поскольку новый движок пока является экспериментальным, он ещё не готов к использованию в продакшне. У этой реализации есть несколько известных ограничений:
  • Быстрое удаление и повторное создание таблицы либо указание одного и того же пути ClickHouse Keeper для разных движков может привести к проблемам. В качестве рекомендации можно использовать {uuid} в kafka_keeper_path, чтобы избежать конфликтов путей.
  • Чтобы обеспечить повторяемые чтения, нельзя потреблять сообщения из нескольких партиций в одном потоке. С другой стороны, потребителей Kafka нужно регулярно опрашивать, чтобы они оставались активными. Из-за этих двух требований мы решили разрешить создание нескольких потребителей только при включённом kafka_thread_per_consumer, иначе становится слишком сложно избежать проблем, связанных с их регулярным опросом.
См. также
Последнее изменение 10 июня 2026 г.