跳转到主要内容
这是用于连接 ClickHouse 的官方 Rust 客户端,最初由 Paul Loyd 开发。客户端的源代码可在 GitHub 仓库 中获取。

概述

  • 使用 serde 对行进行编码和解码。
  • 支持 serde 属性:skip_serializingskip_deserializingrename
  • 通过 HTTP 传输使用 RowBinary 格式。
    • 计划切换为通过 TCP 使用 Native
  • 支持 TLS (通过 native-tlsrustls-tls 功能) 。
  • 支持压缩和解压缩 (LZ4) 。
  • 提供用于查询或插入数据、执行 DDL 语句以及客户端批处理的 API。
  • 为单元测试提供便捷的模拟对象 (mocks) 。

安装

要使用此 crate,请将以下内容添加到 Cargo.toml 中:
[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
另请参阅:crates.io 页面

Cargo 特性

  • lz4 (默认启用) — 启用 Compression::Lz4Compression::Lz4Hc(_) 变体。启用后,除 WATCH 外,所有查询默认使用 Compression::Lz4
  • native-tls — 通过 hyper-tls 支持使用 HTTPS schema 的 URL,并会链接到 OpenSSL。
  • rustls-tls — 通过 hyper-rustls 支持使用 HTTPS schema 的 URL,且不会链接到 OpenSSL。
  • inserter — 启用 client.inserter()
  • test-util — 添加 mock。请参见该示例。仅在 dev-dependencies 中使用。
  • watch — 启用 client.watch 功能。详见对应章节。
  • uuid — 添加 serde::uuid 以便与 uuid crate 配合使用。
  • time — 添加 serde::time 以便与 time crate 配合使用。
通过 HTTPS URL 连接到 ClickHouse 时,应启用 native-tlsrustls-tls 其中一项特性。 如果两者都启用了,则 rustls-tls 特性优先。

ClickHouse 版本兼容性

该客户端兼容 ClickHouse 的 LTS 版本及更新版本,也兼容 ClickHouse Cloud。 早于 v22.6 的 ClickHouse server 在极少数情况下会错误处理 RowBinary。 你可以使用 v0.11+ 并启用 wa-37420 feature 来解决此问题。注意:不要将此 feature 与较新的 ClickHouse 版本一起使用。

示例

我们的目标是通过客户端代码仓库中的示例,涵盖客户端使用的各种场景。示例 README 中提供了概览。 如果示例或下文档中有任何不清楚或缺失的内容,欢迎联系我们

用法

ch2rs crate 可用于根据 ClickHouse 生成行类型。

创建客户端实例

请复用已创建的客户端,或克隆它们,以复用底层的 hyper 连接池。
use clickhouse::Client;

let client = Client::default()
    // 应同时包含协议和端口
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");

HTTPS 或 ClickHouse Cloud 连接

HTTPS 可与 rustls-tlsnative-tls cargo feature 配合使用。 然后,像平常一样创建客户端。在此示例中,使用环境变量来存储连接信息:
URL 应同时包含协议和端口,例如 https://instance.clickhouse.cloud:8443
fn read_env_var(key: &str) -> String {
    env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
    .with_url(read_env_var("CLICKHOUSE_URL"))
    .with_user(read_env_var("CLICKHOUSE_USER"))
    .with_password(read_env_var("CLICKHOUSE_PASSWORD"));
另请参见:

查询行

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let table_name = "some";
let mut cursor = client
    .query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
    .bind(Identifier(table_name))
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • 占位符 ?fields 会被替换为 no, name (即 Row 的字段) 。
  • 占位符 ? 会被替换为后续 bind() 调用中提供的值。
  • 可以使用便捷的 fetch_one::<Row>()fetch_all::<Row>() 方法,分别获取第一行或全部行。
  • sql::Identifier 可用于绑定表名。
注意:由于整个响应是以流式传输的,游标即使在已经返回了一些行之后,仍可能报错。如果你的使用场景遇到这种情况,可以尝试使用 query(...).with_option("wait_end_of_query", "1"),以启用服务端的响应缓冲。更多详情。此外,buffer_size 选项也可能会有帮助。
在选择行时请谨慎使用 wait_end_of_query,因为它会增加服务端的内存消耗,并且很可能降低整体性能。

插入行数据

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • 如果未调用 end(),此次 INSERT 将被中止。
  • 行会以 stream 的形式逐步发送,以分摊网络负载。
  • 只有当所有行都属于同一分区,且行数小于 max_insert_block_size 时,ClickHouse 才会以原子方式插入整个批次。

异步插入 (服务端批处理)

你可以使用 ClickHouse asynchronous inserts 来避免在客户端对传入数据进行批处理。只需为 insert 方法传入 async_insert 选项即可 (甚至可以直接在 Client 实例上设置,这样会影响所有 insert 调用) 。
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("async_insert", "1")
    .with_option("wait_for_async_insert", "0");
另请参阅:

Inserter 功能 (客户端批量处理)

需要启用 inserter Cargo 功能。
let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}

// 不要忘记在应用程序关闭时对 inserter 进行收尾处理
// 并提交剩余的行。`.end()` 同样会返回统计信息。
inserter.end().await?;
  • 当任一阈值 (max_bytesmax_rowsperiod) 达到时,Inserter 会在 commit() 中结束当前正在进行的插入。
  • 可以使用 with_period_bias 调整相邻两次结束活动 INSERT 的时间间隔,以避免并行 inserter 带来的负载尖峰。
  • Inserter::time_left() 可用于判断当前周期何时结束。如果你的 stream 很少发出条目,请再次调用 Inserter::commit() 以检查这些限制。
  • 时间阈值通过 quanta crate 实现,以加快 inserter 的运行速度。如果启用了 test-util,则不会使用它 (因此在自定义测试中可以通过 tokio::time::advance() 控制时间) 。
  • 两次 commit() 调用之间的所有行都会通过同一条 INSERT 语句插入。
如果你想终止/完成插入,别忘了执行 flush:
inserter.end().await?;

执行 DDL 语句

对于单节点部署,像这样执行 DDL 语句 即可:
client.query("DROP TABLE IF EXISTS some").execute().await?;
不过,对于使用负载均衡器或 ClickHouse Cloud 的集群部署,建议使用 wait_end_of_query 选项,等待 DDL 在所有副本上应用完成。可以这样做:
client
    .query("DROP TABLE IF EXISTS some")
    .with_option("wait_end_of_query", "1")
    .execute()
    .await?;

ClickHouse 设置

你可以使用 with_option 方法应用各种 ClickHouse 设置。例如:
let numbers = client
    .query("SELECT number FROM system.numbers")
    // 此设置仅适用于当前查询;
    // 它将覆盖全局客户端设置。
    .with_option("limit", "3")
    .fetch_all::<u64>()
    .await?;
除了 query 之外,insertinserter 方法的用法也类似;此外,也可以在 Client 实例上调用同一方法,为所有查询设置全局参数。

查询 ID

使用 .with_option,可以设置 query_id 选项,用于在 ClickHouse 查询日志中标识查询。
let numbers = client
    .query("SELECT number FROM system.numbers LIMIT 1")
    .with_option("query_id", "some-query-id")
    .fetch_all::<u64>()
    .await?;
除了 query,它与 insertinserter 方法的用法也类似。
如果手动设置 query_id,请确保其唯一。UUID 是不错的选择。
另请参见:客户端仓库中的 query_id 示例

会话 ID

query_id 类似,你也可以设置 session_id,以便在同一会话中执行这些语句。session_id 既可以在客户端级别进行全局设置,也可以为每次 queryinsertinserter 调用单独设置。
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("session_id", "my-session");
在集群部署中,由于缺少”会话粘性”,你需要连接到特定的集群节点,才能正确使用此功能。因为例如轮询负载均衡器无法保证后续请求会由同一个 ClickHouse 节点处理。
另请参阅:客户端仓库中的 session_id 示例

自定义 HTTP 请求头

如果你使用代理进行身份验证,或需要传递自定义请求头,可以这样操作:
let client = Client::default()
    .with_url("http://localhost:8123")
    .with_header("X-My-Header", "hello");
另请参阅客户端仓库中的自定义 HTTP 请求头示例

自定义 HTTP 客户端

这可用于调整底层 HTTP 连接池的设置。
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // 或 HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
    // 客户端侧保持特定空闲套接字存活的时长(毫秒)。
    // 该值应明显小于 ClickHouse server 的 KeepAlive 超时时间,
    // 默认情况下,23.11 之前的版本为 3 秒,之后的版本为 10 秒。
    .pool_idle_timeout(Duration::from_millis(2_500))
    // 设置连接池中允许的最大空闲 Keep-Alive 连接数。
    .pool_max_idle_per_host(4)
    .build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
此示例依赖旧版 Hyper API,今后可能会有所变动。
另请参见客户端仓库中的自定义 HTTP 客户端示例

数据类型

  • (U)Int(8|16|32|64|128) 可与对应的 (u|i)(8|16|32|64|128) 类型或基于它们定义的 newtype 相互映射。
  • (U)Int256 不直接受支持,但有一种变通方法
  • Float(32|64) 可与对应的 f(32|64) 或基于它们定义的 newtype 相互映射。
  • Decimal(32|64|128) 可与对应的 i(32|64|128) 或基于它们定义的 newtype 相互映射。使用 fixnum 或其他有符号定点数实现会更方便。
  • Boolean 可与 bool 或基于它定义的 newtype 相互映射。
  • String 可与任意字符串或字节类型相互映射,例如 &str&[u8]StringVec<u8>SmartString。也支持 newtype。若要存储字节,建议使用 serde_bytes,因为它的效率更高。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
    str: &'a str,
    string: String,
    #[serde(with = "serde_bytes")]
    bytes: Vec<u8>,
    #[serde(with = "serde_bytes")]
    byte_slice: &'a [u8],
}
  • FixedString(N) 支持表示为字节数组,例如 [u8; N]
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
    fixed_str: [u8; 16], // FixedString(16)
}
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
    Debug = 1,
    Info = 2,
    Warn = 3,
    Error = 4,
}
  • UUID 可通过 serde::uuiduuid::Uuid 相互映射。需要启用 uuid 特性。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::uuid")]
    uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4")]
    ipv4: std::net::Ipv4Addr,
}
  • Date 可映射为/从 u16 或其包装的 newtype,表示自 1970-01-01 起经过的天数。此外,也支持 time::Date,可通过 serde::time::date 使用,但需要启用 time feature。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: u16,
    #[serde(with = "clickhouse::serde::time::date")]
    date: Date,
}
  • Date32 可映射为/从 i32 或其对应的 newtype 映射而来,表示自 1970-01-01 以来经过的天数。此外,还支持 time::Date,可通过 serde::time::date32 使用,但这需要启用 time feature。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: i32,
    #[serde(with = "clickhouse::serde::time::date32")]
    date: Date,
}
  • DateTime 可与 u32 或其封装 newtype 相互映射,表示自 UNIX 纪元以来经过的秒数。此外,还可通过 serde::time::datetime 支持 time::OffsetDateTime,但这需要启用 time feature。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: u32,
    #[serde(with = "clickhouse::serde::time::datetime")]
    dt: OffsetDateTime,
}
  • DateTime64(_) 可映射为/从 i32 或其外层 newtype,并表示自 UNIX 纪元以来经过的时间。另请注意,也支持 time::OffsetDateTime:使用 serde::time::datetime64::* 即可,但这需要启用 time feature。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: i64, // 根据 `DateTime64(X)` 的精度,单位为 s/us/ms/ns
    #[serde(with = "clickhouse::serde::time::datetime64::secs")]
    dt64s: OffsetDateTime,  // `DateTime64(0)`
    #[serde(with = "clickhouse::serde::time::datetime64::millis")]
    dt64ms: OffsetDateTime, // `DateTime64(3)`
    #[serde(with = "clickhouse::serde::time::datetime64::micros")]
    dt64us: OffsetDateTime, // `DateTime64(6)`
    #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
    dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...) 可映射为/从 (A, B, ...),也可映射为其外层的 newtype。
  • Array(_) 可映射为/从任意切片,例如 Vec<_>&[_]。也支持 newtype。
  • Map(K, V) 的行为类似 Array((K, V))
  • LowCardinality(_) 可无缝使用。
  • Nullable(_) 可映射为/从 Option<_>。对于 clickhouse::serde::* 辅助函数,请添加 ::option
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4::option")]
    ipv4_opt: Option<Ipv4Addr>,
}
  • Nested 可通过提供多个数组并进行重命名来支持。
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(rename = "items.name")]
    items_name: Vec<String>,
    #[serde(rename = "items.count")]
    items_count: Vec<u32>,
}
  • 支持 Geo 类型。Point 的行为类似于元组 (f64, f64),其余类型都只是点切片。
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    point: Point,
    ring: Ring,
    polygon: Polygon,
    multi_polygon: MultiPolygon,
    line_string: LineString,
    multi_line_string: MultiLineString,
}
  • VariantDynamic 和 (新的) JSON 数据类型目前尚不支持。

模拟

该 crate 提供了一些工具,可用于模拟 CH server,并测试 DDL、SELECTINSERTWATCH 查询。可通过 test-util feature 启用此功能。应将其用作开发依赖。 参见示例

故障排查

CANNOT_READ_ALL_DATA

CANNOT_READ_ALL_DATA 错误最常见的原因是应用程序端的行定义与 ClickHouse 中的定义不一致。 请看下面这张表:
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
然后,如果应用端定义的 EventLog 类型不一致,例如:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: String, // <- 应该使用 u32 而不是 String!
}
插入数据时,可能会出现以下错误:
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
在此示例中,这个问题可通过正确定义 EventLog 结构体来解决:
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: u32
}

已知限制

  • 目前尚不支持 VariantDynamic 和 (新的) JSON 数据类型。
  • 目前尚不支持服务端参数绑定;相关进展请参见此 issue

联系我们

如果您有任何疑问或需要帮助,欢迎随时通过 Community SlackGitHub issues 与我们联系。
最后修改于 2026年6月10日