DataStore 提供两种兼容模式,用于控制输出是为 Pandas 兼容性调整,还是针对原生 SQL 性能进行优化。
| 模式 | compat_mode 值 | 描述 |
|---|
| Pandas (默认) | "pandas" | 完全兼容 pandas 的行为。保留行顺序,支持 MultiIndex、set_index、dtype 修正、稳定排序的平局裁定规则,以及 -If/isNaN 包装器。 |
| Performance | "performance" | 采用 SQL 优先的执行方式。移除所有 pandas 兼容性开销。吞吐量最高,但结果在结构上可能与 pandas 不同。 |
| 开销 | Pandas 模式行为 | 性能模式 行为 |
|---|
| 保留行顺序 | 注入 _row_id、rowNumberInAllBlocks() 和 __orig_row_num__ 子查询 | 已禁用 — 不保证行顺序 |
| 稳定排序的平局决胜项 | 在 ORDER BY 后附加 rowNumberInAllBlocks() ASC | 已禁用 — 相同值的顺序可能是任意的 |
| Parquet preserve_order | input_format_parquet_preserve_order=1 | 已禁用 — 允许并行读取 Parquet |
| GroupBy 自动 ORDER BY | 添加 ORDER BY group_key (pandas 默认 sort=True) | 已禁用 — 返回的分组顺序可能是任意的 |
| GroupBy dropna WHERE | 添加 WHERE key IS NOT NULL (pandas 默认 dropna=True) | 已禁用 — 包含 NULL 分组 |
| GroupBy set_index | 将分组键设为索引 | 已禁用 — 分组键保留为列 |
| MultiIndex 列 | agg({'col': ['sum','mean']}) 返回 MultiIndex 列 | 已禁用 — 改为扁平列名 (col_sum、col_mean) |
-If/isNaN 包装 | 为 skipna 使用 sumIf(col, NOT isNaN(col)) | 已禁用 — 使用普通 sum(col) (ClickHouse 原生会跳过 NULL) |
对 count 使用 toInt64 | 使用 toInt64(count()) 以匹配 pandas 的 int64 | 已禁用 — 返回原生 SQL dtype |
对全 NaN 的 sum 使用 fillna(0) | 全为 NaN 的 sum 返回 0 (pandas 行为) | 已禁用 — 返回 NULL |
| Dtype 修正 | 如 abs() 从无符号转为有符号等 | 已禁用 — 使用原生 SQL dtype |
| 索引保留 | SQL 执行后恢复原始索引 | 已禁用 |
first()/last() | argMin/argMax(col, rowNumberInAllBlocks()) | any(col) / anyLast(col) — 更快,但非确定性 |
| 单 SQL 聚合 | ColumnExpr groupby 会物化中间 DataFrame | 将 LazyGroupByAgg 注入惰性操作链中 — 单条 SQL 查询 |
from chdb.datastore.config import config
# 启用性能模式
config.use_performance_mode()
# 切换回 pandas 兼容模式
config.use_pandas_compat()
# 查看当前模式
print(config.compat_mode) # 'pandas' or 'performance'
from chdb.datastore.config import set_compat_mode, CompatMode, is_performance_mode
# 启用性能模式
set_compat_mode(CompatMode.PERFORMANCE)
# 检查
print(is_performance_mode()) # True
# 恢复默认
set_compat_mode(CompatMode.PANDAS)
from chdb import use_performance_mode, use_pandas_compat
use_performance_mode()
# ... 高性能操作 ...
use_pandas_compat()
启用性能模式后,执行引擎会自动设为 chdb。无需另外调用 config.use_chdb()。
在以下情况下使用性能模式:
- 处理大型数据集 (数十万到数百万行)
- 运行以聚合为主的工作负载 (groupby、sum、mean、count)
- 行顺序不重要 (例如聚合结果、报表、仪表盘)
- 你希望获得最高的 SQL 吞吐量和最低的开销
- 需要关注内存使用量 (并行读取 Parquet,无中间 DataFrame)
在以下情况下请保持使用 pandas 模式:
- 你需要完全一致的 pandas 行为 (行顺序、MultiIndex、dtypes)
- 你依赖
first()/last() 返回真正的第一行/最后一行
- 你使用依赖行顺序的
shift()、diff()、cumsum()
- 你正在编写测试,将 DataStore 的输出与 pandas 进行比较
在性能模式下,任何操作的行顺序都无法保证。这包括:
- 过滤结果
- GroupBy 聚合结果
- 未显式使用
sort_values() 的 head() / tail()
first() / last() 聚合
如果你需要有序结果,请显式添加 sort_values():
config.use_performance_mode()
ds = pd.read_csv("data.csv")
# 无序(快速)
result = ds.groupby("region")["revenue"].sum()
# 有序(仍然快速,只是添加了 ORDER BY)
result = ds.groupby("region")["revenue"].sum().sort_values()
| 方面 | Pandas 模式 | 性能模式 |
|---|
| 分组键位置 | 索引 (通过 set_index) | 普通列 |
| 分组顺序 | 按键排序 (默认) | 任意顺序 |
| NULL 分组 | 排除 (默认 dropna=True) | 包含 |
| 列格式 | 多重聚合时使用 MultiIndex | 扁平命名 (col_func) |
first()/last() | 确定性 (按行顺序) | 非确定性 (any()/anyLast()) |
config.use_performance_mode()
# 全为 NaN 的分组求和返回 NULL(而非 0)
# Count 返回原生 uint64(而非强制转换为 int64)
# 无需 -If 包装器:使用 sum() 而非 sumIf()
result = ds.groupby("cat")["val"].sum()
在性能模式下,ColumnExpr 的 groupby 聚合 (例如 ds[condition].groupby('col')['val'].sum()) 会以单条 SQL 查询执行,而不是像 pandas 模式那样分两步进行:
config.use_performance_mode()
# Pandas 模式:两个 SQL 查询(过滤器 → 物化 → groupby)
# 性能模式:一个 SQL 查询(WHERE + GROUP BY 合并为单个查询)
result = ds[ds["rating"] > 3.5].groupby("category")["revenue"].sum()
# 生成的 SQL(单个查询):
# SELECT category, sum(revenue) FROM data WHERE rating > 3.5 GROUP BY category
这避免了中间 DataFrame 的物化过程,并且可以显著降低内存占用并缩短执行时间。
性能模式 (compat_mode) 和执行引擎 (execution_engine) 是两个相互独立的配置维度:
| 配置项 | 控制内容 | 取值 |
|---|
execution_engine | 由哪个引擎执行计算 | auto, chdb, pandas |
compat_mode | 是否为实现 Pandas 兼容性而调整输出形态 | pandas, performance |
将 compat_mode='performance' 设为性能模式时,execution_engine 会自动设为 chdb,因为性能模式是为 SQL 执行而设计的。
from chdb.datastore.config import config
# 这两项配置相互独立
config.use_chdb() # 强制使用 chDB 引擎,保留 pandas 兼容性
config.use_performance_mode() # 强制使用 chDB + 去除 pandas 开销
为性能模式编写测试时,结果的行顺序和结构格式可能与 pandas 不一致。请采用以下策略:
# 比较前先对两侧按相同列排序
ds_result = ds.groupby("cat")["val"].sum()
pd_result = pd_df.groupby("cat")["val"].sum()
ds_sorted = ds_result.sort_index()
pd_sorted = pd_result.sort_index()
np.testing.assert_array_equal(ds_sorted.values, pd_sorted.values)
# first() 与 any() 结合使用,从分组中返回任意一个元素
result = ds.groupby("cat")["val"].first()
for group_key in groups:
assert result.loc[group_key] in group_values[group_key]
Schema 与计数 (LIMIT 不使用 ORDER BY)
# head() 不带 sort_values:行集合是非确定性的
result = ds.head(5)
assert len(result) == 5
assert set(result.columns) == expected_columns
from chdb.datastore.config import config
config.use_performance_mode()
# 所有后续操作均可受益
ds = pd.read_parquet("data.parquet")
result = ds[ds["amount"] > 100].groupby("region")["amount"].sum()
# 用于需要有序输出的展示或下游处理
result = (ds
.groupby("region")["revenue"].sum()
.sort_values(ascending=False)
)
config.use_performance_mode()
# ETL 管道 — 顺序无关紧要,吞吐量才是关键
summary = (ds
.filter(ds["date"] >= "2024-01-01")
.groupby(["region", "product"])
.agg({"revenue": "sum", "quantity": "sum", "rating": "mean"})
)
summary.to_df().to_parquet("summary.parquet")
# 高性能模式,适用于大量计算
config.use_performance_mode()
aggregated = ds.groupby("cat")["val"].sum()
# 切换回 pandas 模式以进行精确匹配比较
config.use_pandas_compat()
detailed = ds[ds["val"] > 100].head(10)