跳转到主要内容
DataStore 支持从多种文件格式和数据源读取数据,也支持将数据写入这些格式和数据源。

读取数据

CSV 文件

read_csv(filepath_or_buffer, sep=',', header='infer', names=None, 
         usecols=None, dtype=None, nrows=None, skiprows=None,
         compression=None, encoding=None, **kwargs)
示例:
from chdb import datastore as pd

# 基本 CSV 读取
ds = pd.read_csv("data.csv")

# With options
ds = pd.read_csv(
    "data.csv",
    sep=";",                    # 自定义分隔符
    header=0,                   # 表头行索引
    names=['a', 'b', 'c'],      # 自定义列名
    usecols=['a', 'b'],         # 仅读取指定列
    dtype={'a': 'Int64'},       # 指定数据类型
    nrows=1000,                 # 仅读取前 1000 行
    skiprows=1,                 # 跳过第一行
    compression='gzip',         # 压缩文件
    encoding='utf-8'            # 编码
)

# 从 URL 读取
ds = pd.read_csv("https://example.com/data.csv")

Parquet 文件

推荐用于大型数据集——列式格式,压缩效果更好。
read_parquet(path, columns=None, **kwargs)
示例:
# 基本 Parquet 读取
ds = pd.read_parquet("data.parquet")

# 仅读取特定列(高效——只读取所需数据)
ds = pd.read_parquet("data.parquet", columns=['col1', 'col2', 'col3'])

# 从 S3 读取
ds = pd.read_parquet("s3://bucket/data.parquet")

JSON 文件

read_json(path_or_buf, orient=None, lines=False, **kwargs)
示例:
# 标准 JSON
ds = pd.read_json("data.json")

# JSON Lines(换行符分隔)
ds = pd.read_json("data.jsonl", lines=True)

# 指定方向的 JSON
ds = pd.read_json("data.json", orient='records')

Excel 文件

read_excel(io, sheet_name=0, header=0, names=None, **kwargs)
示例:
# 读取第一个工作表
ds = pd.read_excel("data.xlsx")

# 读取指定工作表
ds = pd.read_excel("data.xlsx", sheet_name="Sheet1")
ds = pd.read_excel("data.xlsx", sheet_name=2)  # 第三个工作表

# 读取多个工作表(返回字典)
sheets = pd.read_excel("data.xlsx", sheet_name=['Sheet1', 'Sheet2'])

SQL 数据库

read_sql(sql, con, **kwargs)
示例:
# 从 SQL 查询中读取
ds = pd.read_sql("SELECT * FROM users", connection)
ds = pd.read_sql("SELECT * FROM orders WHERE date > '2024-01-01'", connection)

其他格式

# Feather (Arrow)
ds = pd.read_feather("data.feather")

# ORC
ds = pd.read_orc("data.orc")

# Pickle
ds = pd.read_pickle("data.pkl")

# 固定宽度格式
ds = pd.read_fwf("data.txt", widths=[10, 20, 15])

# HTML 表格
ds = pd.read_html("https://example.com/table.html")[0]

写入数据

to_csv

导出为 CSV 格式。
to_csv(path_or_buf=None, sep=',', na_rep='', header=True, 
       index=True, mode='w', compression=None, **kwargs)
示例:
ds = pd.read_parquet("data.parquet")

# 基本导出
ds.to_csv("output.csv")

# 指定选项
ds.to_csv(
    "output.csv",
    sep=";",                    # 自定义分隔符
    index=False,                # 不包含行索引
    header=True,                # 包含列标题
    na_rep='NULL',              # 将 NaN 表示为 'NULL'
    compression='gzip'          # 压缩输出
)

# 转为字符串
csv_string = ds.to_csv()

to_parquet

导出为 Parquet 格式 (推荐用于大规模数据) 。
to_parquet(path, engine='pyarrow', compression='snappy', **kwargs)
示例:
# 基本导出
ds.to_parquet("output.parquet")

# 使用压缩选项
ds.to_parquet("output.parquet", compression='gzip')
ds.to_parquet("output.parquet", compression='zstd')

# 分区输出
ds.to_parquet(
    "output/",
    partition_cols=['year', 'month']
)

to_json

导出为 JSON 格式。
to_json(path_or_buf=None, orient='records', lines=False, **kwargs)
示例:
# 标准 JSON(记录数组)
ds.to_json("output.json", orient='records')

# JSON Lines(每行一个 JSON 对象)
ds.to_json("output.jsonl", lines=True)

# 不同的方向模式
ds.to_json("output.json", orient='split')    # {columns, data, index}
ds.to_json("output.json", orient='records')  # [{col: val}, ...]
ds.to_json("output.json", orient='columns')  # {col: {idx: val}}

# 转换为字符串
json_string = ds.to_json()

to_excel

导出为 Excel 格式。
to_excel(excel_writer, sheet_name='Sheet1', index=True, **kwargs)
示例:
# 单个工作表
ds.to_excel("output.xlsx")
ds.to_excel("output.xlsx", sheet_name="Data", index=False)

# 多个工作表
with pd.ExcelWriter("output.xlsx") as writer:
    ds1.to_excel(writer, sheet_name="Sales")
    ds2.to_excel(writer, sheet_name="Inventory")

to_sql

导出至 SQL 数据库,或生成 SQL 字符串。
to_sql(name=None, con=None, schema=None, if_exists='fail', **kwargs)
示例:
# 生成 SQL 查询(不执行)
sql = ds.to_sql()
print(sql)
# SELECT ...
# FROM ...
# WHERE ...

# 写入数据库
ds.to_sql("table_name", connection, if_exists='replace')

其他导出方式

# 转换为 pandas DataFrame
df = ds.to_df()
df = ds.to_pandas()

# 转换为 Arrow Table
table = ds.to_arrow()

# 转换为 NumPy 数组
arr = ds.to_numpy()

# 转换为字典
d = ds.to_dict()
d = ds.to_dict(orient='records')  # 字典列表
d = ds.to_dict(orient='list')     # 列表字典

# 转换为记录(元组列表)
records = ds.to_records()

# 转换为字符串
s = ds.to_string()
s = ds.to_string(max_rows=100)

# 转换为 Markdown
md = ds.to_markdown()

# 转换为 HTML
html = ds.to_html()

# 转换为 LaTeX
latex = ds.to_latex()

# 复制到剪贴板
ds.to_clipboard()

# 转换为 pickle
ds.to_pickle("output.pkl")

# 转换为 feather
ds.to_feather("output.feather")

文件格式对比

格式读取速度写入速度文件大小schema适用场景
Parquet大型数据集、分析
CSV中等兼容性、简单数据
JSON中等部分支持API、嵌套数据
Excel中等部分支持与非技术用户共享
Feather非常快非常快中等进程间通信、pandas

建议

  1. 对于分析类工作负载: 使用 Parquet
    • 列式格式支持仅读取所需列
    • 压缩效果出色
    • 保留数据类型
  2. 对于数据交换: 使用 CSV 或 JSON
    • 兼容性通用
    • 便于人工阅读
  3. 对于 pandas 互操作: 使用 Feather 或 Arrow
    • 序列化速度最快
    • 保留类型

压缩支持

读取压缩文件

# 从扩展名自动检测
ds = pd.read_csv("data.csv.gz")
ds = pd.read_csv("data.csv.bz2")
ds = pd.read_csv("data.csv.xz")
ds = pd.read_csv("data.csv.zst")

# 显式指定压缩格式
ds = pd.read_csv("data.csv", compression='gzip')

写入压缩文件

# 带压缩的 CSV
ds.to_csv("output.csv.gz", compression='gzip')
ds.to_csv("output.csv.bz2", compression='bz2')

# Parquet(始终压缩)
ds.to_parquet("output.parquet", compression='snappy')  # 默认
ds.to_parquet("output.parquet", compression='gzip')
ds.to_parquet("output.parquet", compression='zstd')    # 最佳压缩比
ds.to_parquet("output.parquet", compression='lz4')     # 最快

压缩选项

压缩速度压缩比适用场景
snappy非常快Parquet 默认选项
lz4非常快速度优先
gzip中等兼容性优先
zstd非常高平衡性最佳
bz2非常高最大压缩率

流式 I/O

对于无法完全载入内存的超大文件:

分块读取

# 分块读取
for chunk in pd.read_csv("large.csv", chunksize=100000):
    # 处理每个数据块
    process(chunk)

# 使用迭代器
reader = pd.read_csv("large.csv", iterator=True)
chunk = reader.get_chunk(10000)

使用 ClickHouse 流式

from chdb.datastore import DataStore

# 从文件流式读取,无需将所有数据加载到内存中
ds = DataStore.from_file("huge.parquet")

# 操作是惰性的——仅计算所需的部分
result = ds.filter(ds['amount'] > 1000).head(100)

远程数据源

HTTP/HTTPS

# 从 URL 读取
ds = pd.read_csv("https://example.com/data.csv")
ds = pd.read_parquet("https://example.com/data.parquet")

S3

from chdb.datastore import DataStore

# 匿名访问
ds = DataStore.uri("s3://bucket/data.parquet?nosign=true")

# 使用凭据
ds = DataStore.from_s3(
    "s3://bucket/data.parquet",
    access_key_id="KEY",
    secret_access_key="SECRET"
)

GCS、Azure、HDFS

有关云存储选项,请参阅 工厂方法

最佳实践

1. 大文件请使用 Parquet

# 将 CSV 转换为 Parquet 以提升性能
ds = pd.read_csv("large.csv")
ds.to_parquet("large.parquet")

# 后续读取速度更快
ds = pd.read_parquet("large.parquet")

2. 仅选择所需列

# 高效 - 仅读取 col1 和 col2
ds = pd.read_parquet("data.parquet", columns=['col1', 'col2'])

# 低效 - 读取所有列后再过滤
ds = pd.read_parquet("data.parquet")[['col1', 'col2']]

3. 启用压缩

# 文件体积更小,通常因 I/O 减少而速度更快
ds.to_parquet("output.parquet", compression='zstd')

4. 批次写入

# 只写入一次,不要在循环中写入
result = process_all_data(ds)
result.to_parquet("output.parquet")

# 不要这样做(低效)
for chunk in chunks:
    chunk.to_parquet(f"output_{i}.parquet")
最后修改于 2026年6月10日