跳转到主要内容
如果你使用的是 ClickHouse Cloud,我们建议改用 ClickPipes。ClickPipes 原生支持私有网络连接、对摄取和集群资源进行独立扩缩容,以及对流式 Kafka 数据摄取到 ClickHouse 的全面监控。
  • 发布或订阅数据流。
  • 构建容错存储。
  • 在流可用时立即处理。

创建表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [ALIAS expr1],
    name2 [type2] [ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_security_protocol = '',]
    [kafka_sasl_mechanism = '',]
    [kafka_sasl_username = '',]
    [kafka_sasl_password = '',]
    [kafka_autodetect_client_rack = '',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_consumer_reschedule_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_consumer_acquire_timeout_ms = 30000,]
    [kafka_max_rows_per_message = 1,]
    [kafka_compression_codec = '',]
    [kafka_compression_level = -1];
必需参数:
  • kafka_broker_list — 以逗号分隔的 broker 列表 (例如 localhost:9092) 。
  • kafka_topic_list — Kafka topic 列表。
  • kafka_group_name — 一组 Kafka 消费者。系统会分别跟踪每个组的读取偏移量。如果不希望消息在集群中重复,请在所有地方使用相同的组名。
  • kafka_format — 消息格式。使用与 SQL FORMAT 函数相同的表示法,例如 JSONEachRow。更多信息,请参见 格式 部分。
可选参数:
  • kafka_security_protocol - 与消息代理通信时使用的协议。可能的值:plaintextsslsasl_plaintextsasl_ssl
  • kafka_sasl_mechanism - 用于身份验证的 SASL 机制。可选值:GSSAPIPLAINSCRAM-SHA-256SCRAM-SHA-512OAUTHBEARER
  • kafka_sasl_username - 用于 PLAINSASL-SCRAM-.. 机制的 SASL 用户名。
  • kafka_sasl_password - 用于 PLAINSASL-SCRAM-.. 机制的 SASL 密码。
  • kafka_schema — 如果该 format 需要 schema 定义,则必须使用此参数。例如,Cap’n Proto 需要提供 schema 文件的路径以及根对象 schema.capnp:Message 的名称。
  • kafka_schema_registry_skip_bytes — 使用带有封装请求头的 schema registry 时,每条消息开头需要跳过的字节数 (例如 AWS Glue Schema Registry 会包含一个 19 字节的封装) 。范围:[0, 255]。默认值:0
  • kafka_num_consumers —— 每个表的消费者数量。如果单个消费者的吞吐量不足,请增加消费者数量。消费者总数不应超过 topic 的分区数,因为每个分区只能分配给一个消费者;同时,消费者总数也不得大于部署 ClickHouse 的服务器上的物理核心数。默认值:1
  • kafka_max_block_size — poll 操作的最大批次大小 (按消息数计) 。默认值:max_insert_block_size
  • kafka_skip_broken_messages — 每个块中,Kafka 消息解析器对与 schema 不兼容的消息的容忍度。如果 kafka_skip_broken_messages = N,则该 engine 会跳过 N 条无法解析的 Kafka 消息 (一条消息等于一行数据) 。默认值:0
  • kafka_commit_every_batch — 每消费并处理一个批次就提交一次,而不是在整个块写入完成后只提交一次。默认值:0
  • kafka_client_id — 客户端标识。默认为空。
  • kafka_poll_timeout_ms — 从 Kafka 执行单次 poll 的超时时间。默认值:stream_poll_timeout_ms
  • kafka_poll_max_batch_size — 在单次 Kafka poll 中可拉取的最大消息数。默认值:max_block_size
  • kafka_flush_interval_ms — 从 Kafka 刷新数据的超时时间。默认值:stream_flush_interval_ms
  • kafka_consumer_reschedule_ms — 当 Kafka 流处理发生停滞时 (例如,没有可消费的消息时) 的重新调度间隔。此设置控制消费者重试轮询前的等待时间。不得超过 kafka_consumers_pool_ttl_ms。默认值:500 毫秒。
  • kafka_thread_per_consumer — 为每个消费者提供独立线程。启用后,每个消费者都会独立并行地刷新数据 (否则,多个消费者的行会被合并成一个块) 。默认值:0
  • kafka_handle_error_mode — Kafka 引擎的错误处理方式。可能的值:default (如果消息解析失败,则抛出异常) 、stream (异常消息和原始消息将保存在虚拟列 _error_raw_message 中) 、dead_letter_queue (与错误相关的数据将保存在 system.dead_letter_queue 中) 。
  • kafka_commit_on_select — 执行 select 查询时提交消息。默认值:false
  • kafka_consumer_acquire_timeout_ms — 在 Kafka2 表上执行直接 SELECT 查询 (使用基于 Keeper 的偏移量存储) 时,获取 Kafka 消费者的超时时长 (以毫秒为单位) 。当在同一张表上同时运行多个并发的直接 SELECT 查询时,每个查询都必须等待消费者可用。该超时时长可防止因查询分别持有不同的消费者子集而导致死锁。默认值:30000
  • kafka_max_rows_per_message — 对于基于行的格式,一条 Kafka 消息中可写入的最大行数。默认值:1
  • kafka_autodetect_client_rack — 自动为 librdkafka 设置 client.rack 参数,以优先使用最近的 Kafka 副本。 支持的来源: AWS_ZONE_ID 表示 AWS IMDSv2 可用区 ID,例如 euc1-az1AWS_ZONE_NAME 表示 AWS IMDSv2 可用区名称,例如 eu-central-1aGCP_ZONE 表示 GCP 元数据服务的可用区,例如 europe-central2-aCLICKHOUSE 表示使用 ClickHouse 内部检测,这可能依赖云元数据或配置; AWS_ZONE_NAME_THEN_GCP_ZONE 表示先尝试 AWS_ZONE_NAME,然后尝试 GCP_ZONE。 默认值:空字符串,即禁用。 提示:不同环境使用的可用区格式不同。亚马逊 MSK 通常使用可用区 ID,因此优先选择 AWS_ZONE_ID。Confluent Cloud 通常使用可用区名称,因此优先选择 AWS_ZONE_NAME。如果不确定,请使用 AWS_ZONE_NAME_THEN_GCP_ZONE,或检查集群中的 broker.rack 值。 注意:Kafka 消息代理必须配置 broker.rackreplica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
  • kafka_compression_codec — 用于生产消息的压缩编解码器。支持:空字符串、nonegzipsnappylz4zstd。如果为空字符串,则该表不会设置压缩编解码器,因此会使用配置文件中的值或 librdkafka 的默认值。默认值:空字符串。
  • kafka_compression_level — 由 kafka_compression_codec 所选算法决定的压缩级别参数。值越高,压缩效果越好,但 CPU 使用量也会越高。可用范围取决于具体算法:gzip[0-9]lz4[0-12]snappy 仅支持 0zstd[0-12]-1 = 由 codec 决定的默认压缩级别。默认值:-1
  • kafka_map_virtual_columns_on_write — 如果启用,表 schema 中名称为 _key_timestamp_headers.name_headers.value 的特殊列会在 INSERT 时映射到相应的 Kafka 消息元数据,且不会包含在消息载荷中。请参阅将列映射到 Kafka 消息元数据。默认值:false
示例:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue3 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
Kafka 表引擎不支持带有默认值的列。如果需要带默认值的列,可以在 materialized view 层添加它们 (见下文) 。

说明

已传递的消息会被自动跟踪,因此组中的每条消息只会被计数一次。如果你希望将数据获取两次,请使用另一个组名创建一个表副本。 组是灵活的,并会在集群中同步。例如,如果你有 10 个 topic,并且在一个集群中有某个表的 5 个副本,那么每个副本会获得 2 个 topic。如果副本数量发生变化,这些 topic 会自动在各副本之间重新分配。更多信息请参阅 http://kafka.apache.org/intro。 建议每个 Kafka topic 都使用自己专用的消费者组,以确保 topic 与组之间保持独占的一对一对应关系,尤其是在 topic 可能被动态创建和删除的环境中 (例如测试或暂存环境) 。 SELECT 对读取消息并不是特别有用 (调试除外) ,因为每条消息只能被读取一次。更实用的做法是使用 materialized view 创建实时处理线程。为此:
  1. 使用该引擎创建一个 Kafka 消费者,并将其视为数据 stream。
  2. 创建一个具有所需结构的表。
  3. 创建一个 materialized view,将来自该引擎的数据转换后写入先前创建的表中。
MATERIALIZED VIEW 关联到该引擎时,它就会开始在后台收集数据。这使你能够持续从 Kafka 接收消息,并使用 SELECT 将其转换为所需格式。 一个 Kafka 表可以拥有任意多个 materialized view。它们不会直接从 Kafka 表中读取数据,而是接收新的记录 (以块的形式) 。通过这种方式,你可以将数据写入多个明细粒度不同的表中 (包括聚合和非聚合) 。 示例:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() AS total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;
为提高性能,接收到的消息会按 max_insert_block_size 的大小分组成块。如果在 stream_flush_interval_ms 毫秒内仍未形成一个完整的块,则无论该块是否完整,数据都会被刷写到表中。 要停止接收 topic 数据或更改转换逻辑,请分离该 materialized view:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
如果你想使用 ALTER 修改目标表,我们建议先禁用materialized view,以避免目标表与视图数据之间出现不一致。

配置

与 GraphiteMergeTree 类似,Kafka 引擎支持通过 ClickHouse 配置文件进行扩展配置。可使用两类配置键:全局级别 (位于 <kafka> 下) 和 topic 级别 (位于 <kafka><kafka_topic> 下) 。系统会先应用全局配置,再应用 topic 级别的配置 (如果存在) 。
  <kafka>
    <!-- 所有 Kafka 引擎类型表的全局配置选项 -->
    <debug>cgrp</debug>
    <statistics_interval_ms>3000</statistics_interval_ms>

    <kafka_topic>
        <name>logs</name>
        <statistics_interval_ms>4000</statistics_interval_ms>
    </kafka_topic>

    <!-- 消费者配置 -->
    <consumer>
        <auto_offset_reset>smallest</auto_offset_reset>
        <kafka_topic>
            <name>logs</name>
            <fetch_min_bytes>100000</fetch_min_bytes>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <fetch_min_bytes>50000</fetch_min_bytes>
        </kafka_topic>
    </consumer>

    <!-- 生产者配置 -->
    <producer>
        <kafka_topic>
            <name>logs</name>
            <retry_backoff_ms>250</retry_backoff_ms>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <retry_backoff_ms>400</retry_backoff_ms>
        </kafka_topic>
    </producer>
  </kafka>
有关可用配置项的完整列表,请参阅 librdkafka configuration reference。在 ClickHouse 配置中,请使用下划线 (_) 而非点号 (.) 。例如,check.crcs=true 应写为 <check_crcs>true</check_crcs>

Kerberos 支持

要处理支持 Kerberos 的 Kafka,请添加值为 sasl_plaintextsecurity_protocol 子元素。只要操作系统机制已获取并缓存 Kerberos ticket-granting ticket 即可。 ClickHouse 还可以使用 keytab 文件维护 Kerberos 凭据。请考虑使用 sasl_kerberos_service_namesasl_kerberos_keytabsasl_kerberos_principal 子元素。 示例:
<!-- 支持 Kerberos 的 Kafka -->
<kafka>
  <security_protocol>SASL_PLAINTEXT</security_protocol>
  <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
  <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>

虚拟列

  • _topic — Kafka topic。数据类型:LowCardinality(String)
  • _key — 消息的键。数据类型:String
  • _offset — 消息的偏移量。数据类型:UInt64
  • _timestamp — 消息的时间戳。数据类型:Nullable(DateTime)
  • _timestamp_ms — 消息的毫秒级时间戳。数据类型:Nullable(DateTime64(3))
  • _partition — Kafka topic 的分区。数据类型:UInt64
  • _headers.name — 消息请求头键的 Array。数据类型:Array(String)
  • _headers.value — 消息请求头值的 Array。数据类型:Array(String)
kafka_handle_error_mode='stream' 时,还会有以下附加虚拟列:
  • _raw_message - 无法成功解析的原始消息。数据类型:String
  • _error - 解析失败期间产生的异常消息。数据类型:String
注意:_raw_message_error 这两个虚拟列仅在解析过程中发生异常时才会填充;消息成功解析时,它们始终为空。

将列映射到 Kafka 消息元数据

使用 INSERT INTO 生成消息时,如果表中存在名为 _key (类型为 String) 的列,Kafka 引擎始终会将其用作 Kafka 消息键;如果存在名为 _timestamp (类型为 DateTime) 的列,则会将其用作 Kafka 消息时间戳。默认情况下,这些列也会和其他列一起出现在生成的消息载荷中。 启用 kafka_map_virtual_columns_on_write = 1 后,行为会发生变化:
  • _key (类型为 String) — 映射为 Kafka 消息键。
  • _timestamp (类型为 DateTime) — 映射为 Kafka 消息时间戳。
  • _headers.name (类型为 Array(String)) 和 _headers.value (类型为 Array(String)) — 映射为 Kafka 消息请求头。每一对 (_headers.name[i], _headers.value[i]) 都会成为一个 Kafka 请求头。由于 _headers.name_headers.value 共享 _headers 这个 Nested 前缀,ClickHouse 要求每一行中这两个数组的大小必须相同。
只有当这些同名列的类型与上面列出的类型一致时,它们才会从消息载荷中排除;否则仍会保留在载荷中,因此即使 schema 恰好将这些名称用于无关数据,也仍可正常工作。 示例:
CREATE TABLE kafka_out
(
    event_json String,
    `_key` String,
    `_timestamp` DateTime,
    `_headers.name` Array(String),
    `_headers.value` Array(String)
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'broker:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'events-producer',
    kafka_format = 'JSONEachRow',
    kafka_map_virtual_columns_on_write = 1;

INSERT INTO kafka_out VALUES
    ('{"a":1}', 'session-42', now(), ['source', 'trace_id'], ['api', 'abc-123']);
生成的 Kafka 消息包含载荷 {"event_json":"{\"a\":1}"}、键 session-42、当前时间戳,以及两个请求头 source=apitrace_id=abc-123

数据格式支持

Kafka 引擎支持 ClickHouse 支持的所有格式。 单条 Kafka 消息中的行数取决于所用格式是基于行还是基于块:
  • 对于基于行的格式,单条 Kafka 消息中的行数可通过设置 kafka_max_rows_per_message 进行控制。
  • 对于基于块的格式,我们无法将块拆分为更小的部分,但单个块中的行数可通过通用设置 max_block_size 进行控制。

在 ClickHouse Keeper 中存储已提交偏移量的引擎

如果启用了 allow_experimental_kafka_offsets_storage_in_keeper,则可以为 Kafka 表引擎额外指定两个设置:
  • kafka_keeper_path 指定 ClickHouse Keeper 中该表的路径
  • kafka_replica_name 指定 ClickHouse Keeper 中的副本名称
这两个设置要么都指定,要么都不指定。当两者都指定时,将使用一个新的 Experimental Kafka 引擎。这个新引擎不依赖将已提交偏移量存储在 Kafka 中,而是将其存储在 ClickHouse Keeper 中。它仍会尝试将偏移量提交到 Kafka,但只有在创建表时才会依赖这些偏移量。在其他任何情况下 (例如表重启,或在发生错误后恢复) ,都会使用存储在 ClickHouse Keeper 中的偏移量继续消费消息。除了已提交偏移量外,它还会存储上一批次消费的消息数量,因此如果插入失败,就会再次消费相同数量的消息,从而在需要时实现去重。 示例:
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
  kafka_keeper_path = '/clickhouse/{database}/{uuid}',
  kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;

已知限制

由于这个新引擎仍处于 Experimental 阶段,因此尚未准备好用于生产环境。目前该实现存在一些已知限制:
  • 快速删除并重新创建表,或为不同引擎指定相同的 ClickHouse Keeper 路径,都可能导致问题。最佳实践是在 kafka_keeper_path 中使用 {uuid},以避免路径冲突。
  • 为了实现可重复读取,单个线程不能从多个分区消费消息。另一方面,Kafka 消费者又必须定期执行 poll 才能保持存活。基于这两个目标,我们决定只有在启用 kafka_thread_per_consumer 时才允许创建多个消费者,否则很难避免与定期 poll 消费者相关的问题。
另请参见
最后修改于 2026年6月10日