- Публикуйте потоки данных или подписывайтесь на них.
- Организуйте отказоустойчивое хранилище.
- Обрабатывайте потоки по мере их поступления.
Создание таблицы
kafka_broker_list— Список брокеров, разделённых запятыми (например,localhost:9092).kafka_topic_list— Список топиков Kafka.kafka_group_name— Группа потребителей Kafka. Смещения чтения отслеживаются отдельно для каждой группы. Если вы не хотите, чтобы сообщения дублировались в кластере, используйте везде одно и то же имя группы.kafka_format— Формат сообщений. Используется та же нотация, что и для SQL-функцииFORMAT, напримерJSONEachRow. Дополнительные сведения см. в разделе Форматы.
kafka_security_protocol— протокол, используемый для связи с брокерами. Возможные значения:plaintext,ssl,sasl_plaintext,sasl_ssl.kafka_sasl_mechanism- механизм SASL, используемый для аутентификации. Возможные значения:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER.kafka_sasl_username- имя пользователя для SASL-аутентификации при использовании механизмовPLAINиSASL-SCRAM-...kafka_sasl_password— пароль SASL для механизмовPLAINиSASL-SCRAM-...kafka_schema— параметр, который необходимо использовать, если для формата требуется определение схемы. Например, Cap’n Proto требует указать путь к файлу схемы и имя корневого объектаschema.capnp:Message.kafka_schema_registry_skip_bytes— Количество байтов, которое нужно пропустить в начале каждого сообщения при использовании schema registry с заголовками обёртки (например, AWS Glue Schema Registry, который добавляет 19-байтную обёртку). Диапазон:[0, 255]. По умолчанию:0.kafka_num_consumers— Количество consumers на таблицу. Укажите больше consumers, если пропускной способности одного consumer недостаточно. Общее количество consumers не должно превышать количество партиций в topic, поскольку на одну партицию может быть назначен только один consumer, и не должно быть больше количества физических ядер на сервере, где развернут ClickHouse. По умолчанию:1.kafka_max_block_size— Максимальный размер батча (в сообщениях) при выполнении poll. По умолчанию: max_insert_block_size.kafka_skip_broken_messages— допустимое для парсера сообщений Kafka число несовместимых со схемой сообщений на блок. Еслиkafka_skip_broken_messages = N, то движок пропускает N сообщений Kafka, которые не удаётся разобрать (одно сообщение соответствует одной строке данных). Значение по умолчанию:0.kafka_commit_every_batch— Выполнять коммит для каждого потреблённого и обработанного батча, а не один коммит после записи всего блока. По умолчанию:0.kafka_client_id— идентификатор клиента. По умолчанию пустой.kafka_poll_timeout_ms— тайм-аут для одного опроса из Kafka. По умолчанию: stream_poll_timeout_ms.kafka_poll_max_batch_size— Максимальное количество сообщений, получаемых за один опрос Kafka. По умолчанию: max_block_size.kafka_flush_interval_ms— Тайм-аут для сброса данных из Kafka. По умолчанию: stream_flush_interval_ms.kafka_consumer_reschedule_ms— интервал перепланирования при остановке потоковой обработки Kafka (например, когда нет доступных для чтения сообщений). Эта настройка определяет задержку перед тем, как consumer повторит опрос. Не должно превышатьkafka_consumers_pool_ttl_ms. По умолчанию:500миллисекунд.kafka_thread_per_consumer— Выделяет отдельный поток для каждого консьюмера. Если включен, каждый консьюмер независимо сбрасывает данные, параллельно (в противном случае строки от нескольких консьюмеров объединяются в один блок). По умолчанию:0.kafka_handle_error_mode— как обрабатывать ошибки в движке Kafka. Возможные значения: default (если не удастся разобрать сообщение, будет сгенерировано исключение), stream (сообщение об исключении и исходное сообщение будут сохранены в виртуальных столбцах_errorи_raw_message), dead_letter_queue (данные, связанные с ошибкой, будут сохранены в system.dead_letter_queue).kafka_commit_on_select— Выполнять коммит сообщений при выполнении запроса SELECT. Значение по умолчанию:false.kafka_consumer_acquire_timeout_ms— тайм-аут в миллисекундах на получение потребителя Kafka при выполнении прямыхSELECT-запросов к таблицеKafka2(с хранением смещений в Keeper). Когда к одной и той же таблице одновременно выполняются несколько прямыхSELECT-запросов, каждый из них должен ждать, пока потребители не станут доступны. Тайм-аут предотвращает взаимные блокировки в случаях, когда запросы удерживают разные подмножества потребителей. Значение по умолчанию:30000.kafka_max_rows_per_message— Максимальное количество строк, записываемых в одно сообщение Kafka для построчных форматов. По умолчанию:1.kafka_autodetect_client_rack— автоматически задаёт параметрclient.rackдляlibrdkafka, чтобы отдавать предпочтение ближайшим репликам Kafka. Поддерживаемые источники:AWS_ZONE_ID— идентификатор зоны доступности AWS IMDSv2, напримерeuc1-az1;AWS_ZONE_NAME— имя зоны доступности AWS IMDSv2, напримерeu-central-1a;GCP_ZONE— зона сервиса метаданных GCP, напримерeurope-central2-a;CLICKHOUSE— использовать внутреннее определение ClickHouse, которое может опираться на метаданные облака или конфигурацию;AWS_ZONE_NAME_THEN_GCP_ZONE— сначала попытаться использоватьAWS_ZONE_NAME, а затемGCP_ZONE. По умолчанию: пустая строка, отключено. Совет: в разных средах используются разные форматы зон доступности. Amazon MSK обычно использует идентификаторы зон, поэтому предпочтителенAWS_ZONE_ID. Confluent Cloud обычно использует имена зон, поэтому предпочтителенAWS_ZONE_NAME. Если вы не уверены, используйтеAWS_ZONE_NAME_THEN_GCP_ZONEили проверьте значениеbroker.rackв вашем кластере. Примечание: брокеры Kafka должны быть настроены сbroker.rackиreplica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector.kafka_compression_codec— кодек сжатия, используемый при отправке сообщений. Поддерживаются: пустая строка,none,gzip,snappy,lz4,zstd. Если указана пустая строка, кодек сжатия таблицей не задаётся, поэтому будут использоваться значения из файлов конфигурации или значение по умолчанию изlibrdkafka. По умолчанию: пустая строка.kafka_compression_level— параметр уровня сжатия для алгоритма, выбранного с помощью kafka_compression_codec. Более высокие значения обеспечивают лучшее сжатие, но требуют больше ресурсов CPU. Допустимый диапазон зависит от алгоритма:[0-9]дляgzip;[0-12]дляlz4; только0дляsnappy;[0-12]дляzstd;-1= уровень сжатия по умолчанию, зависящий от кодека. Значение по умолчанию:-1.kafka_map_virtual_columns_on_write— Если включено, столбцы со специальными именами_key,_timestamp,_headers.nameи_headers.valueв схеме таблицы сопоставляются с соответствующими метаданными сообщений Kafka приINSERTи исключаются из полезной нагрузки сообщения. См. Сопоставление столбцов с метаданными сообщений Kafka. По умолчанию:false.
Движок таблицы Kafka не поддерживает столбцы со значением по умолчанию. Если вам нужны столбцы со значением по умолчанию, их можно добавить в materialized view (см. ниже).
Описание
SELECT не особенно полезен для чтения сообщений (кроме отладки), потому что каждое сообщение можно прочитать только один раз. Гораздо практичнее создавать потоки в реальном времени с помощью materialized view. Для этого:
- Используйте движок, чтобы создать consumer Kafka, и рассматривайте его как поток данных.
- Создайте таблицу с нужной структурой.
- Создайте materialized view, которое преобразует данные из движка и помещает их в ранее созданную таблицу.
MATERIALIZED VIEW подключается к движку, оно начинает собирать данные в фоновом режиме. Это позволяет непрерывно получать сообщения из Kafka и преобразовывать их в нужный формат с помощью SELECT.
У одной таблицы Kafka может быть сколько угодно materialized view; они не читают данные из таблицы Kafka напрямую, а получают новые записи (блоками), благодаря чему можно записывать данные в несколько таблиц с разным уровнем детализации (с группировкой — агрегацией — и без нее).
Пример:
ALTER, мы рекомендуем отключить materialized view, чтобы избежать расхождений между целевой таблицей и данными, поступающими из представления.
Конфигурация
<kafka>) и на уровне topic (внутри <kafka><kafka_topic>). Сначала применяется глобальная конфигурация, затем — конфигурация на уровне topic (если она задана).
_) вместо точки. Например, check.crcs=true записывается как <check_crcs>true</check_crcs>.
Поддержка Kerberos
security_protocol со значением sasl_plaintext. Этого достаточно, если средствами ОС получен и кэширован Kerberos ticket-granting ticket.
ClickHouse может использовать учетные данные Kerberos с помощью файла keytab. Используйте дочерние элементы sasl_kerberos_service_name, sasl_kerberos_keytab и sasl_kerberos_principal.
Пример:
Виртуальные столбцы
_topic— топик Kafka. Тип данных:LowCardinality(String)._key— ключ сообщения. Тип данных:String._offset— смещение сообщения. Тип данных:UInt64._timestamp— временная метка сообщения. Тип данных:Nullable(DateTime)._timestamp_ms— временная метка сообщения в миллисекундах. Тип данных:Nullable(DateTime64(3))._partition— партиция топика Kafka. Тип данных:UInt64._headers.name— массив ключей заголовков сообщения. Тип данных:Array(String)._headers.value— массив значений заголовков сообщения. Тип данных:Array(String).
kafka_handle_error_mode='stream':
_raw_message- необработанное сообщение, которое не удалось успешно разобрать. Тип данных:String._error- сообщение об исключении, возникшем при неудачном разборе. Тип данных:String.
_raw_message и _error заполняются только при возникновении исключения во время разбора; если сообщение было успешно разобрано, они всегда пусты.
Сопоставление столбцов с метаданными сообщений Kafka
INSERT INTO движок Kafka всегда использует столбец с именем _key (типа String) в качестве ключа сообщения Kafka и столбец с именем _timestamp (типа DateTime) в качестве временной метки сообщения Kafka — если такие столбцы есть в таблице. По умолчанию эти столбцы также включаются в полезную нагрузку отправляемого сообщения наряду с остальными столбцами.
При kafka_map_virtual_columns_on_write = 1 поведение меняется:
_key(типString) — сопоставляется с ключом сообщения Kafka._timestamp(типDateTime) — сопоставляется с временной меткой сообщения Kafka._headers.name(типArray(String)) и_headers.value(типArray(String)) — сопоставляются с заголовками сообщений Kafka. Каждая пара(_headers.name[i], _headers.value[i])становится одним заголовком Kafka. Поскольку_headers.nameи_headers.valueимеют общий вложенный префикс_headers, ClickHouse требует, чтобы оба массива имели одинаковый размер в каждой строке.
{"event_json":"{\"a\":1}"}, ключ session-42, текущую временную метку и два заголовка source=api и trace_id=abc-123.
Поддержка форматов данных
- Для построчных форматов количество строк в одном сообщении Kafka можно задать с помощью настройки
kafka_max_rows_per_message. - Для блочных форматов блок нельзя разбить на более мелкие части, но количество строк в одном блоке можно задать с помощью общей настройки max_block_size.
Движок для хранения зафиксированных смещений в ClickHouse Keeper
allow_experimental_kafka_offsets_storage_in_keeper, то для движка таблицы Kafka можно задать еще две настройки:
kafka_keeper_pathзадает путь к таблице в ClickHouse Keeperkafka_replica_nameзадает имя реплики в ClickHouse Keeper
Известные ограничения
- Быстрое удаление и повторное создание таблицы либо указание одного и того же пути ClickHouse Keeper для разных движков может привести к проблемам. В качестве рекомендации можно использовать
{uuid}вkafka_keeper_path, чтобы избежать конфликтов путей. - Чтобы обеспечить повторяемые чтения, нельзя потреблять сообщения из нескольких партиций в одном потоке. С другой стороны, потребителей Kafka нужно регулярно опрашивать, чтобы они оставались активными. Из-за этих двух требований мы решили разрешить создание нескольких потребителей только при включённом
kafka_thread_per_consumer, иначе становится слишком сложно избежать проблем, связанных с их регулярным опросом.