- 发布或订阅数据流。
- 构建容错存储。
- 在流可用时立即处理。
创建表
kafka_broker_list— 以逗号分隔的 broker 列表 (例如localhost:9092) 。kafka_topic_list— Kafka topic 列表。kafka_group_name— 一组 Kafka 消费者。系统会分别跟踪每个组的读取偏移量。如果不希望消息在集群中重复,请在所有地方使用相同的组名。kafka_format— 消息格式。使用与 SQLFORMAT函数相同的表示法,例如JSONEachRow。更多信息,请参见 格式 部分。
kafka_security_protocol- 与消息代理通信时使用的协议。可能的值:plaintext、ssl、sasl_plaintext、sasl_ssl。kafka_sasl_mechanism- 用于身份验证的 SASL 机制。可选值:GSSAPI、PLAIN、SCRAM-SHA-256、SCRAM-SHA-512、OAUTHBEARER。kafka_sasl_username- 用于PLAIN和SASL-SCRAM-..机制的 SASL 用户名。kafka_sasl_password- 用于PLAIN和SASL-SCRAM-..机制的 SASL 密码。kafka_schema— 如果该 format 需要 schema 定义,则必须使用此参数。例如,Cap’n Proto 需要提供 schema 文件的路径以及根对象schema.capnp:Message的名称。kafka_schema_registry_skip_bytes— 使用带有封装请求头的 schema registry 时,每条消息开头需要跳过的字节数 (例如 AWS Glue Schema Registry 会包含一个 19 字节的封装) 。范围:[0, 255]。默认值:0。kafka_num_consumers—— 每个表的消费者数量。如果单个消费者的吞吐量不足,请增加消费者数量。消费者总数不应超过 topic 的分区数,因为每个分区只能分配给一个消费者;同时,消费者总数也不得大于部署 ClickHouse 的服务器上的物理核心数。默认值:1。kafka_max_block_size— poll 操作的最大批次大小 (按消息数计) 。默认值:max_insert_block_size。kafka_skip_broken_messages— 每个块中,Kafka 消息解析器对与 schema 不兼容的消息的容忍度。如果kafka_skip_broken_messages = N,则该 engine 会跳过 N 条无法解析的 Kafka 消息 (一条消息等于一行数据) 。默认值:0。kafka_commit_every_batch— 每消费并处理一个批次就提交一次,而不是在整个块写入完成后只提交一次。默认值:0。kafka_client_id— 客户端标识。默认为空。kafka_poll_timeout_ms— 从 Kafka 执行单次 poll 的超时时间。默认值:stream_poll_timeout_ms。kafka_poll_max_batch_size— 在单次 Kafka poll 中可拉取的最大消息数。默认值:max_block_size。kafka_flush_interval_ms— 从 Kafka 刷新数据的超时时间。默认值:stream_flush_interval_ms。kafka_consumer_reschedule_ms— 当 Kafka 流处理发生停滞时 (例如,没有可消费的消息时) 的重新调度间隔。此设置控制消费者重试轮询前的等待时间。不得超过kafka_consumers_pool_ttl_ms。默认值:500毫秒。kafka_thread_per_consumer— 为每个消费者提供独立线程。启用后,每个消费者都会独立并行地刷新数据 (否则,多个消费者的行会被合并成一个块) 。默认值:0。kafka_handle_error_mode— Kafka 引擎的错误处理方式。可能的值:default (如果消息解析失败,则抛出异常) 、stream (异常消息和原始消息将保存在虚拟列_error和_raw_message中) 、dead_letter_queue (与错误相关的数据将保存在 system.dead_letter_queue 中) 。kafka_commit_on_select— 执行 select 查询时提交消息。默认值:false。kafka_consumer_acquire_timeout_ms— 在Kafka2表上执行直接SELECT查询 (使用基于 Keeper 的偏移量存储) 时,获取 Kafka 消费者的超时时长 (以毫秒为单位) 。当在同一张表上同时运行多个并发的直接SELECT查询时,每个查询都必须等待消费者可用。该超时时长可防止因查询分别持有不同的消费者子集而导致死锁。默认值:30000。kafka_max_rows_per_message— 对于基于行的格式,一条 Kafka 消息中可写入的最大行数。默认值:1。kafka_autodetect_client_rack— 自动为librdkafka设置client.rack参数,以优先使用最近的 Kafka 副本。 支持的来源:AWS_ZONE_ID表示 AWS IMDSv2 可用区 ID,例如euc1-az1;AWS_ZONE_NAME表示 AWS IMDSv2 可用区名称,例如eu-central-1a;GCP_ZONE表示 GCP 元数据服务的可用区,例如europe-central2-a;CLICKHOUSE表示使用 ClickHouse 内部检测,这可能依赖云元数据或配置;AWS_ZONE_NAME_THEN_GCP_ZONE表示先尝试AWS_ZONE_NAME,然后尝试GCP_ZONE。 默认值:空字符串,即禁用。 提示:不同环境使用的可用区格式不同。亚马逊 MSK 通常使用可用区 ID,因此优先选择AWS_ZONE_ID。Confluent Cloud 通常使用可用区名称,因此优先选择AWS_ZONE_NAME。如果不确定,请使用AWS_ZONE_NAME_THEN_GCP_ZONE,或检查集群中的broker.rack值。 注意:Kafka 消息代理必须配置broker.rack和replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector。kafka_compression_codec— 用于生产消息的压缩编解码器。支持:空字符串、none、gzip、snappy、lz4、zstd。如果为空字符串,则该表不会设置压缩编解码器,因此会使用配置文件中的值或librdkafka的默认值。默认值:空字符串。kafka_compression_level— 由 kafka_compression_codec 所选算法决定的压缩级别参数。值越高,压缩效果越好,但 CPU 使用量也会越高。可用范围取决于具体算法:gzip为[0-9];lz4为[0-12];snappy仅支持0;zstd为[0-12];-1= 由 codec 决定的默认压缩级别。默认值:-1。kafka_map_virtual_columns_on_write— 如果启用,表 schema 中名称为_key、_timestamp、_headers.name和_headers.value的特殊列会在INSERT时映射到相应的 Kafka 消息元数据,且不会包含在消息载荷中。请参阅将列映射到 Kafka 消息元数据。默认值:false。
Kafka 表引擎不支持带有默认值的列。如果需要带默认值的列,可以在 materialized view 层添加它们 (见下文) 。
说明
SELECT 对读取消息并不是特别有用 (调试除外) ,因为每条消息只能被读取一次。更实用的做法是使用 materialized view 创建实时处理线程。为此:
- 使用该引擎创建一个 Kafka 消费者,并将其视为数据 stream。
- 创建一个具有所需结构的表。
- 创建一个 materialized view,将来自该引擎的数据转换后写入先前创建的表中。
MATERIALIZED VIEW 关联到该引擎时,它就会开始在后台收集数据。这使你能够持续从 Kafka 接收消息,并使用 SELECT 将其转换为所需格式。
一个 Kafka 表可以拥有任意多个 materialized view。它们不会直接从 Kafka 表中读取数据,而是接收新的记录 (以块的形式) 。通过这种方式,你可以将数据写入多个明细粒度不同的表中 (包括聚合和非聚合) 。
示例:
ALTER 修改目标表,我们建议先禁用materialized view,以避免目标表与视图数据之间出现不一致。
配置
<kafka> 下) 和 topic 级别 (位于 <kafka><kafka_topic> 下) 。系统会先应用全局配置,再应用 topic 级别的配置 (如果存在) 。
_) 而非点号 (.) 。例如,check.crcs=true 应写为 <check_crcs>true</check_crcs>。
Kerberos 支持
sasl_plaintext 的 security_protocol 子元素。只要操作系统机制已获取并缓存 Kerberos ticket-granting ticket 即可。
ClickHouse 还可以使用 keytab 文件维护 Kerberos 凭据。请考虑使用 sasl_kerberos_service_name、sasl_kerberos_keytab 和 sasl_kerberos_principal 子元素。
示例:
虚拟列
_topic— Kafka topic。数据类型:LowCardinality(String)。_key— 消息的键。数据类型:String。_offset— 消息的偏移量。数据类型:UInt64。_timestamp— 消息的时间戳。数据类型:Nullable(DateTime)。_timestamp_ms— 消息的毫秒级时间戳。数据类型:Nullable(DateTime64(3))。_partition— Kafka topic 的分区。数据类型:UInt64。_headers.name— 消息请求头键的 Array。数据类型:Array(String)。_headers.value— 消息请求头值的 Array。数据类型:Array(String)。
kafka_handle_error_mode='stream' 时,还会有以下附加虚拟列:
_raw_message- 无法成功解析的原始消息。数据类型:String。_error- 解析失败期间产生的异常消息。数据类型:String。
_raw_message 和 _error 这两个虚拟列仅在解析过程中发生异常时才会填充;消息成功解析时,它们始终为空。
将列映射到 Kafka 消息元数据
INSERT INTO 生成消息时,如果表中存在名为 _key (类型为 String) 的列,Kafka 引擎始终会将其用作 Kafka 消息键;如果存在名为 _timestamp (类型为 DateTime) 的列,则会将其用作 Kafka 消息时间戳。默认情况下,这些列也会和其他列一起出现在生成的消息载荷中。
启用 kafka_map_virtual_columns_on_write = 1 后,行为会发生变化:
_key(类型为String) — 映射为 Kafka 消息键。_timestamp(类型为DateTime) — 映射为 Kafka 消息时间戳。_headers.name(类型为Array(String)) 和_headers.value(类型为Array(String)) — 映射为 Kafka 消息请求头。每一对(_headers.name[i], _headers.value[i])都会成为一个 Kafka 请求头。由于_headers.name和_headers.value共享_headers这个 Nested 前缀,ClickHouse 要求每一行中这两个数组的大小必须相同。
{"event_json":"{\"a\":1}"}、键 session-42、当前时间戳,以及两个请求头 source=api 和 trace_id=abc-123。
数据格式支持
- 对于基于行的格式,单条 Kafka 消息中的行数可通过设置
kafka_max_rows_per_message进行控制。 - 对于基于块的格式,我们无法将块拆分为更小的部分,但单个块中的行数可通过通用设置 max_block_size 进行控制。
在 ClickHouse Keeper 中存储已提交偏移量的引擎
allow_experimental_kafka_offsets_storage_in_keeper,则可以为 Kafka 表引擎额外指定两个设置:
kafka_keeper_path指定 ClickHouse Keeper 中该表的路径kafka_replica_name指定 ClickHouse Keeper 中的副本名称
已知限制
- 快速删除并重新创建表,或为不同引擎指定相同的 ClickHouse Keeper 路径,都可能导致问题。最佳实践是在
kafka_keeper_path中使用{uuid},以避免路径冲突。 - 为了实现可重复读取,单个线程不能从多个分区消费消息。另一方面,Kafka 消费者又必须定期执行 poll 才能保持存活。基于这两个目标,我们决定只有在启用
kafka_thread_per_consumer时才允许创建多个消费者,否则很难避免与定期 poll 消费者相关的问题。