跳转到主要内容
本参考文档介绍了 DataStore API 的核心类。

DataStore

用于数据处理的主要类,类似于 DataFrame。
from chdb.datastore import DataStore

构造函数

DataStore(data=None, columns=None, index=None, dtype=None, copy=None)
参数:
参数类型说明
datadict/list/DataFrame/DataStore输入数据
columnslist列名
indexIndex行索引
dtypedict列数据类型
copybool复制数据
示例:
# 从字典创建
ds = DataStore({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})

# 从 pandas DataFrame 创建
import pandas as pd
ds = DataStore(pd.DataFrame({'a': [1, 2, 3]}))

# 空 DataStore
ds = DataStore()

属性

PropertyTypeDescription
columnsIndex列名称
dtypesSeries列的数据类型
shapetuple(行数, 列数)
sizeint元素总数
ndimint维度数量 (2)
emptyboolDataFrame 是否为空
valuesndarray底层数据的 NumPy 数组表示
indexIndex行索引
TDataStore转置
axeslist轴列表

工厂方法

方法说明
uri(uri)通用 URI 工厂方法
from_file(path, ...)从文件创建
from_df(df)从 pandas DataFrame 创建
from_s3(url, ...)从 S3 创建
from_gcs(url, ...)从 Google Cloud Storage 创建
from_azure(url, ...)从 Azure Blob 创建
from_mysql(...)从 MySQL 创建
from_postgresql(...)从 PostgreSQL 创建
from_clickhouse(...)从 ClickHouse 创建
from_mongodb(...)从 MongoDB 创建
from_sqlite(...)从 SQLite 创建
from_iceberg(path)从 Iceberg 表创建
from_delta(path)从 Delta Lake 创建
from_numbers(n)使用连续数字创建
from_random(rows, cols)使用随机数据创建
run_sql(query)从 SQL 查询创建
详见 工厂方法

查询方法

方法返回值描述
select(*cols)DataStore选择列
filter(condition)DataStore过滤行
where(condition)DataStorefilter 的别名
sort(*cols, ascending=True)DataStore对行进行排序
orderby(*cols)DataStoresort 的别名
limit(n)DataStore限制返回行数
offset(n)DataStore跳过指定行数
distinct(subset=None)DataStore去除重复项
groupby(*cols)LazyGroupBy对行分组
having(condition)DataStore过滤分组结果
join(right, ...)DataStore连接 DataStore
union(other, all=False)DataStore合并 DataStore
when(cond, val)CaseWhenCASE WHEN
详见 查询构建

兼容 Pandas 的方法

完整的 209 个方法列表请参见 Pandas 兼容性 索引: head(), tail(), sample(), loc, iloc, at, iat, query(), isin(), where(), mask(), get(), xs(), pop() 聚合: sum(), mean(), std(), var(), min(), max(), median(), count(), nunique(), quantile(), describe(), corr(), cov(), skew(), kurt() 操作: drop(), drop_duplicates(), dropna(), fillna(), replace(), rename(), assign(), astype(), copy() 排序: sort_values(), sort_index(), nlargest(), nsmallest(), rank() 重塑: pivot(), pivot_table(), melt(), stack(), unstack(), transpose(), explode(), squeeze() 组合: merge(), join(), concat(), append(), combine(), update(), compare() 应用/转换: apply(), applymap(), map(), agg(), transform(), pipe(), groupby() 时间序列: rolling(), expanding(), ewm(), shift(), diff(), pct_change(), resample()

I/O 方法

方法说明
to_csv(path, ...)导出为 CSV
to_parquet(path, ...)导出为 Parquet
to_json(path, ...)导出为 JSON
to_excel(path, ...)导出为 Excel
to_df()转换为 pandas DataFrame
to_pandas()to_df 的别名
to_arrow()转换为 Arrow 表
to_dict(orient)转换为字典
to_records()转换为记录
to_numpy()转换为 NumPy 数组
to_sql()生成 SQL 字符串
to_string()字符串表示形式
to_markdown()Markdown 表
to_html()HTML 表
详见 I/O Operations

调试方法

方法说明
explain(verbose=False)显示执行计划
clear_cache()清除缓存结果
详见 调试

魔术方法

方法描述
__getitem__(key)ds['col'], ds[['a', 'b']], ds[condition]
__setitem__(key, value)ds['col'] = value
__delitem__(key)del ds['col']
__len__()len(ds)
__iter__()for col in ds
__contains__(key)'col' in ds
__repr__()repr(ds)
__str__()str(ds)
__eq__(other)ds == other
__ne__(other)ds != other
__lt__(other)ds < other
__le__(other)ds <= other
__gt__(other)ds > other
__ge__(other)ds >= other
__add__(other)ds + other
__sub__(other)ds - other
__mul__(other)ds * other
__truediv__(other)ds / other
__floordiv__(other)ds // other
__mod__(other)ds % other
__pow__(other)ds ** other
__and__(other)ds & other
__or__(other)`dsother`
__invert__()~ds
__neg__()-ds
__pos__()+ds
__abs__()abs(ds)

ColumnExpr

表示用于惰性求值的列表达式。在访问列时会返回该表达式。
# ColumnExpr 会自动返回
col = ds['name']  # 返回 ColumnExpr

属性

属性类型说明
namestr列名
dtypedtype数据类型

访问器

访问器描述方法
.str字符串操作56 个方法
.dtDateTime 操作42+ 个方法
.arr数组操作37 个方法
.jsonJSON 解析13 个方法
.urlURL 解析15 个方法
.ipIP 地址操作9 个方法
.geo地理空间/距离操作14 个方法
完整文档请参见 访问器

算术运算

ds['total'] = ds['price'] * ds['quantity']
ds['profit'] = ds['revenue'] - ds['cost']
ds['ratio'] = ds['a'] / ds['b']
ds['squared'] = ds['value'] ** 2
ds['remainder'] = ds['value'] % 10

比较运算

ds[ds['age'] > 25]           # 大于
ds[ds['age'] >= 25]          # 大于等于
ds[ds['age'] < 25]           # 小于
ds[ds['age'] <= 25]          # 小于等于
ds[ds['name'] == 'Alice']    # 等于
ds[ds['name'] != 'Bob']      # 不等于

逻辑运算

ds[(ds['age'] > 25) & (ds['city'] == 'NYC')]    # 与
ds[(ds['age'] > 25) | (ds['city'] == 'NYC')]    # 或
ds[~(ds['status'] == 'inactive')]               # 非

方法

方法描述
as_(alias)设置别名
cast(dtype)转换为指定类型
astype(dtype)cast 的别名
isnull()是否为 NULL
notnull()是否不为 NULL
isna()isnull 的别名
notna()notnull 的别名
isin(values)是否在值列表中
between(low, high)是否介于两个值之间
fillna(value)填充 NULL 值
replace(to_replace, value)替换值
clip(lower, upper)截断值
abs()绝对值
round(decimals)四舍五入
floor()向下取整
ceil()向上取整
apply(func)应用函数
map(mapper)映射值

聚合方法

方法描述
sum()求和
mean()平均值
avg()mean() 的别名
min()最小值
max()最大值
count()非 NULL 值计数
nunique()去重计数
std()标准差
var()方差
median()中位数
quantile(q)分位数
first()第一个值
last()最后一个值
any()任意值为 true
all()所有值均为 true

LazyGroupBy

表示用于聚合操作的分组 DataStore。
# LazyGroupBy 会自动返回
grouped = ds.groupby('category')  # 返回 LazyGroupBy

方法

方法返回值说明
agg(spec)DataStore聚合
aggregate(spec)DataStoreagg 的别名
sum()DataStore按组求和
mean()DataStore按组求平均值
count()DataStore按组计数
min()DataStore按组求最小值
max()DataStore按组求最大值
std()DataStore按组求标准差
var()DataStore按组求方差
median()DataStore按组求中位数
nunique()DataStore按组统计唯一值数量
first()DataStore按组取第一个值
last()DataStore按组取最后一个值
nth(n)DataStore按组取第 n 个值
head(n)DataStore按组取前 n 个值
tail(n)DataStore按组取后 n 个值
apply(func)DataStore对每组应用函数
transform(func)DataStore按组转换
filter(func)DataStore过滤分组

列选择

# 在 groupby 之后选择列
grouped['amount'].sum()     # 返回 DataStore
grouped[['a', 'b']].sum()   # 返回 DataStore

聚合规范

# 单一聚合
grouped.agg({'amount': 'sum'})

# 每列多个聚合
grouped.agg({'amount': ['sum', 'mean', 'count']})

# 命名聚合
grouped.agg(
    total=('amount', 'sum'),
    average=('amount', 'mean'),
    count=('id', 'count')
)

LazySeries

表示惰性 Series (即单列) 。

属性

属性类型描述
namestrSeries 名称
dtypedtype数据类型

方法

继承了 ColumnExpr 的大多数方法。主要方法包括:
方法说明
value_counts()值频次
unique()去重后的值
nunique()唯一值数量
mode()众数
to_list()转换为列表
to_numpy()转换为数组
to_frame()转换为 DataStore

F (函数)

ClickHouse 函数所在的命名空间。
from chdb.datastore import F, Field

# 聚合
F.sum(Field('amount'))
F.avg(Field('price'))
F.count(Field('id'))
F.quantile(Field('value'), 0.95)

# 条件
F.sum_if(Field('amount'), Field('status') == 'completed')
F.count_if(Field('active'))

# Window
F.row_number().over(order_by='date')
F.lag('price', 1).over(partition_by='product', order_by='date')
详情请参阅聚合

字段

通过名称引用列。
from chdb.datastore import Field

# 创建字段引用
amount = Field('amount')
price = Field('price')

# 在表达式中使用
F.sum(Field('amount'))
F.avg(Field('price'))

CaseWhen

CASE WHEN 表达式构建器。
# 创建 case-when 表达式
result = (ds
    .when(ds['score'] >= 90, 'A')
    .when(ds['score'] >= 80, 'B')
    .when(ds['score'] >= 70, 'C')
    .otherwise('F')
)

# 赋值给列
ds['grade'] = result

Window

窗口函数的窗口规范。
from chdb.datastore import F

# 创建窗口
window = F.window(
    partition_by='category',
    order_by='date',
    rows_between=(-7, 0)
)

# 与聚合函数配合使用
ds['rolling_avg'] = F.avg('price').over(window)
最后修改于 2026年6月10日