メインコンテンツへスキップ
DataStoreを効果的に活用し、最適なパフォーマンスを引き出すには、その遅延評価モデルを理解することが重要です。

遅延評価

DataStore は 遅延評価 を採用しています。操作はすぐには実行されず、記録されたうえで最適化された SQL クエリにコンパイルされます。実行されるのは、実際に結果が必要になったときだけです。

例: 遅延評価と即時評価

from pathlib import Path
Path("sales.csv").write_text("""\
region,product,category,amount,quantity,price,date,order_id
East,Widget,Electronics,5200,10,120,2024-01-15,1001
West,Gadget,Electronics,800,5,160,2024-02-20,1002
East,Gizmo,Home,6500,3,100,2024-03-10,1003
North,Widget,Electronics,4500,6,150,2024-06-18,1004
West,Gadget,Electronics,2000,8,250,2024-09-14,1005
""")

from chdb import datastore as pd

ds = pd.read_csv("sales.csv")

# これらの操作はまだ実行されていない
result = (ds
    .filter(ds['amount'] > 1000)    # 記録済み、未実行
    .select('region', 'amount')      # 記録済み、未実行
    .groupby('region')               # 記録済み、未実行
    .agg({'amount': 'sum'})          # 記録済み、未実行
    .sort('sum', ascending=False)    # 記録済み、未実行
)

# まだ実行されていない — クエリプランを構築しているだけ
print(result.to_sql())
# SELECT region, SUM(amount) AS sum
# FROM file('sales.csv', 'CSVWithNames')
# WHERE amount > 1000
# GROUP BY region
# ORDER BY sum DESC

# ここで実行が発生する
df = result.to_df()  # <-- 実行をトリガーする

遅延評価の利点

  1. クエリ最適化: 複数の操作が、最適化された単一のSQLクエリにまとめられます
  2. フィルタのプッシュダウン: フィルタはデータソース側で適用されます
  3. カラムのプルーニング: 必要なカラムだけが読み込まれます
  4. 決定の遅延: 実行エンジンは実行時に選択できます
  5. プランの確認: 実行前にクエリを確認したり、デバッグしたりできます

実行トリガー

実際の値が必要になった時点で、自動的に実行がトリガーされます:

自動トリガー

トリガー説明
print() / repr()print(ds)結果を表示
len()len(ds)行数を取得
.columnsds.columnsカラム名を取得
.dtypesds.dtypesカラムの型を取得
.shapeds.shape次元を取得
.indexds.index行インデックスを取得
.valuesds.valuesNumPy 配列を取得
反復処理for row in ds行を反復処理
to_df()ds.to_df()pandas DataFrame に変換
to_pandas()ds.to_pandas()to_df のエイリアス
to_dict()ds.to_dict()dict に変換
to_numpy()ds.to_numpy()NumPy 配列に変換
.equals()ds.equals(other)DataStore 同士を比較
例:
# これらはすべて実行をトリガーする
print(ds)              # 表示
len(ds)                # 1000
ds.columns             # Index(['name', 'age', 'city'])
ds.shape               # (1000, 3)
list(ds)               # 値のリスト
ds.to_df()             # pandas DataFrame

遅延評価のままの操作

OperationReturnsDescription
filter()DataStoreWHERE 句を追加
select()DataStoreカラム選択を追加
sort()DataStoreORDER BY を追加
groupby()LazyGroupByGROUP BY を準備
join()DataStoreJOIN を追加
ds['col']ColumnExprカラム参照
ds[['col1', 'col2']]DataStoreカラム選択
例:
# これらは実行をトリガーしない - 遅延評価のまま維持される
result = ds.filter(ds['age'] > 25)      # DataStore を返す
result = ds.select('name', 'age')        # DataStore を返す
result = ds['name']                      # ColumnExpr を返す
result = ds.groupby('city')              # LazyGroupBy を返す

3段階の実行モデル

DataStore の操作は、3段階の実行モデルに従います。

フェーズ1: SQLクエリの構築 (遅延)

SQLで表現できる操作は蓄積されます:
result = (ds
    .filter(ds['status'] == 'active')   # WHERE
    .select('user_id', 'amount')         # SELECT
    .groupby('user_id')                  # GROUP BY
    .agg({'amount': 'sum'})              # SUM()
    .sort('sum', ascending=False)        # ORDER BY
    .limit(10)                           # LIMIT
)
# すべて1つのSQLクエリにコンパイルされる

フェーズ 2: 実行時点

トリガーが発生すると、それまでに蓄積された SQL が実行されます:
# ここで実行がトリガーされる
df = result.to_df()  
# 最適化された単一のSQLクエリがここで実行される

フェーズ 3: DataFrame の操作 (該当する場合)

実行後に pandas 固有の操作を続けて行う場合:
# 混合操作
result = (ds
    .filter(ds['amount'] > 100)          # フェーズ1: SQL
    .to_df()                             # フェーズ2: 実行
    .pivot_table(...)                    # フェーズ3: pandas
)

実行計画の確認

実行される内容を確認するには、explain() を使用します。
Query
ds = pd.read_csv("sales.csv")

query = (ds
    .filter(ds['amount'] > 1000)
    .groupby('region')
    .agg({'amount': ['sum', 'mean']})
)

# 実行計画を表示
query.explain()
Response
Pipeline:
  1. Source: file('sales.csv', 'CSVWithNames')
  2. Filter: amount > 1000
  3. GroupBy: region
  4. Aggregate: sum(amount), avg(amount)

Generated SQL:
SELECT region, SUM(amount) AS sum, AVG(amount) AS mean
FROM file('sales.csv', 'CSVWithNames')
WHERE amount > 1000
GROUP BY region
詳細を表示するには verbose=True を指定します:
query.explain(verbose=True)
詳しいドキュメントについては、デバッグ: explain()を参照してください。

キャッシュ

DataStore は、同じクエリの重複実行を避けるために実行結果をキャッシュします。

キャッシュの仕組み

from pathlib import Path
Path("data.csv").write_text("""\
name,age,city,salary,department
Alice,25,NYC,55000,Engineering
Bob,30,LA,65000,Product
Charlie,35,NYC,80000,Engineering
Diana,28,SF,70000,Design
Eve,42,NYC,95000,Product
""")

ds = pd.read_csv("data.csv")
result = ds.filter(ds['age'] > 25)

# 初回アクセス - クエリを実行
print(result.shape)  # 実行してキャッシュする

# 2回目のアクセス - キャッシュを使用
print(result.columns)  # キャッシュされた結果を使用

# 3回目のアクセス - キャッシュを使用
df = result.to_df()  # キャッシュされた結果を使用

cache の無効化

DataStore を変更する操作が行われると、cache は無効化されます。
result = ds.filter(ds['age'] > 25)
print(result.shape)  # 実行し、キャッシュする

# 新しい操作によりキャッシュが無効化される
result2 = result.filter(result['city'] == 'NYC')
print(result2.shape)  # 再実行する(異なるクエリ)

cacheの手動制御

# cacheをクリア
ds.clear_cache()

# cacheを無効化
from chdb.datastore.config import config
config.set_cache_enabled(False)

SQL と Pandas の操作を組み合わせる

DataStore は、SQL と pandas が混在する操作をインテリジェントに処理します。

SQL互換の操作

これらはSQLに変換されます:
  • filter(), where()
  • select()
  • groupby(), agg()
  • sort(), orderby()
  • limit(), offset()
  • join(), union()
  • distinct()
  • カラム操作 (算術演算、比較、文字列メソッド)

Pandas のみの操作

これらは実行をトリガーし、pandas を使用します。
  • カスタム関数を使った apply()
  • 複雑な集計を伴う pivot_table()
  • stack()unstack()
  • 実行済みの DataFrame に対する操作

ハイブリッドパイプライン

# SQLフェーズ
result = (ds
    .filter(ds['amount'] > 100)      # SQL
    .groupby('category')              # SQL
    .agg({'amount': 'sum'})           # SQL
)

# 実行 + pandasフェーズ
result = (result
    .to_df()                          # SQL実行
    .pivot_table(...)                 # pandas操作
)

実行エンジンの選択

DataStore では、異なるエンジンを使用して操作を実行できます。

自動モード (既定)

from chdb.datastore.config import config

config.set_execution_engine('auto')  # デフォルト
# 操作ごとに最適なエンジンを自動選択

chDB Engine を強制的に使用する

config.set_execution_engine('chdb')
# すべての操作にClickHouse SQLを使用

pandasエンジンを強制的に使用する

config.set_execution_engine('pandas')
# すべての操作にpandasを使用
詳細は、設定: 実行エンジンを参照してください。

パフォーマンスへの影響

良い例: 早めにフィルタする

# 良い例: SQLでフィルタリングしてから集計する
result = (ds
    .filter(ds['date'] >= '2024-01-01')  # 早い段階でデータを絞り込む
    .groupby('category')
    .agg({'amount': 'sum'})
)

悪い例: フィルタを後回しにする

# 悪い例: すべて集計してからフィルタリング
result = (ds
    .groupby('category')
    .agg({'amount': 'sum'})
    .to_df()
    .query('sum > 1000')  # 集計後にPandasでフィルタリング
)

良い例:早い段階でカラムを絞り込む

# 良い例: SQLでカラムを選択する
result = (ds
    .select('user_id', 'amount', 'date')
    .filter(ds['date'] >= '2024-01-01')
    .groupby('user_id')
    .agg({'amount': 'sum'})
)

良い例: SQLに処理を任せる

# 良い例: SQLで複雑な集計を行う
result = (ds
    .groupby('category')
    .agg({
        'amount': ['sum', 'mean', 'count'],
        'quantity': 'sum'
    })
    .sort('sum', ascending=False)
    .limit(10)
)
# 1つのSQLクエリですべてを処理する

# 悪い例: 複数の個別クエリ
sums = ds.groupby('category')['amount'].sum().to_df()
means = ds.groupby('category')['amount'].mean().to_df()
# 1つではなく2つのクエリを実行している

ベストプラクティスの概要

  1. 実行前に処理をまとめる - クエリ全体を組み立ててから、一度だけ実行をトリガーする
  2. 早い段階でフィルタする - ソース側でデータを絞り込む
  3. 必要なカラムだけを選択する - カラムのプルーニングによりパフォーマンスが向上する
  4. explain() を使って実行内容を把握する - 実行前にデバッグする
  5. 集計は SQL に任せる - ClickHouse はこの処理向けに最適化されている
  6. 実行のトリガーを意識する - 意図しない早期実行を避ける
  7. cacheは適切に使う - cacheが無効化されるタイミングを理解する
最終更新日 2026年6月10日