このガイドでは、読者がすでに インクリメンタルmaterialized view の概念と、s3 や gcs などのテーブル関数を使ったデータロード に慣れていることを前提としています。また、オブジェクトストレージからの insert パフォーマンスの最適化 に関するガイドを読むこともお勧めします。そこで紹介しているアドバイスは、このガイド全体を通して登場する insert にも適用できます。
サンプルデータセット
pip などのツールを使った Python パッケージのダウンロード 1 件を表しています。
たとえば、このサブセットには 2024-12-17 の 1 日分のデータが含まれており、https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/ で公開されています。次のようにクエリできます。
1 兆行を超える完全な PyPI データセットは、公開デモ環境 clickpy.clickhouse.com で利用できます。このデータセットの詳細 (パフォーマンス向上のためにデモで materialized view をどのように活用しているかや、データが毎日どのように投入されているかを含む) については、こちらを参照してください。
バックフィルのシナリオ
- 既存のデータインジェストがある状態でのデータのバックフィル - 新しいデータはすでに読み込まれており、履歴データをバックフィルする必要があります。この履歴データはすでに特定されています。
- 既存のテーブルへのmaterialized viewの追加 - 履歴データがすでに投入され、データのストリーミングも進行している構成に、新しいmaterialized viewを追加する必要があります。
重複テーブルとビューの使用
pypi テーブルとmaterialized viewを示します。これはPythonプロジェクトごとのダウンロード数を計算するものです。
{101..200} をロードする場合を考えます。pypi に直接insertすることもできますが、duplicate tableを作成することで、このバックフィルを他の処理から切り離して実行できます。
バックフィルが失敗した場合でも、メインテーブルへの影響はなく、重複テーブルをtruncateして処理をやり直すだけで済みます。
これらのビューの新しいコピーを作成するには、接尾辞 _v2 を付けた CREATE TABLE AS 句を使用します。
pypi_v2 と pypi_downloads_v2 をtruncate して、データロードをやり直せば済みます。
データロードが完了したら、ALTER TABLE MOVE PARTITION 句を使って、複製テーブルからメインテーブルへデータを移動できます。
パーティション名上記の
MOVE PARTITION 呼び出しでは、パーティション名 () を使用しています。これは、このテーブル (パーティション化されていないテーブル) に存在する唯一のパーティションを表します。パーティション化されたテーブルの場合は、各パーティションに対して 1 回ずつ、複数回 MOVE PARTITION を呼び出す必要があります。現在のパーティション名は、system.parts テーブルから確認できます。たとえば、SELECT DISTINCT partition FROM system.parts WHERE (table = 'pypi_v2') を実行します。pypi と pypi_downloads に完全なデータが含まれていることを確認できます。pypi_downloads_v2 と pypi_v2 は安全に削除できます。
MOVE PARTITION 操作は軽量であり (ハードリンクを利用) 、しかもアトミックであるという点です。つまり、中間状態を挟まず、失敗するか成功するかのどちらかになります。
以下のバックフィルのシナリオでは、このプロセスを多用します。
このプロセスでは、ユーザーが各挿入操作のサイズを選ぶ必要がある点に注意してください。
挿入サイズが大きい、つまり行数が多いほど、必要な MOVE PARTITION 操作の回数は少なくなります。ただし、これは挿入が失敗した場合 (たとえばネットワークの中断など) に復旧するコストとのバランスを取る必要があります。リスクを減らすために、ファイルのバッチ化をこのプロセスに組み合わせることもできます。これは、WHERE timestamp BETWEEN 2024-12-17 09:00:00 AND 2024-12-17 10:00:00 のような範囲クエリ、または glob パターンのいずれでも実行できます。たとえば、
ClickPipes では、オブジェクトストレージ からデータをロードする際にこの方法が使われ、ターゲットテーブルとその materialized view の複製が自動的に作成されるため、ユーザーが上記の手順を実行する必要はありません。さらに、異なるサブセットをそれぞれ処理する複数のワーカースレッドを使用し (glob パターンを使用) 、各スレッドが独自の複製テーブルを持つことで、データを exactly-once セマンティクスで高速にロードできます。詳しくは、このブログをご覧ください。
シナリオ 1: 既存のデータインジェストを維持したままデータをバックフィルする
- チェックポイントを特定します。これは、履歴データを復元する必要がある起点のタイムスタンプ、またはカラム値です。
- メインテーブルと、materialized view 用のターゲットテーブルの複製を作成します。
- 手順 (2) で作成したターゲットテーブルを参照する materialized view のコピーを作成します。
- 手順 (2) で作成した複製メインテーブルに挿入します。
- 複製テーブルからすべてのパーティションを元のテーブルに移動し、複製テーブルを削除します。
2024-12-17 09:00:00 より前のデータをロードする必要があることがわかります。先ほどの手順と同様に、複製したテーブルとビューを作成し、タイムスタンプでフィルタして必要な部分集合をロードします。
Parquet の timestamp カラムに対するフィルタリングは非常に効率的です。ClickHouse は、読み込むデータ範囲全体を特定するために timestamp カラムのみを読み取るため、ネットワークトラフィックを最小限に抑えられます。min-max などの Parquet インデックスも、ClickHouse のクエリエンジンで活用できます。
ClickHouse Cloud では ClickPipes を使用してくださいClickHouse Cloud を使用している場合、データを専用のバケットに分離できるなら (つまりフィルターが不要なら) 、履歴バックアップの復元には ClickPipes を使用してください。ClickPipes は、複数のワーカーでロードを並列化して読み込み時間を短縮できるだけでなく、上記のプロセスを自動化し、メインテーブルと materialized view の両方に対して複製テーブルを作成します。
シナリオ 2: 既存のテーブルへの materialized view の追加
POPULATE は避けてください
POPULATE コマンドを使って materialized view をバックフィルすることは、インジェストを停止した小規模なデータセットを除き、推奨していません。この演算子では、populate ハッシュの完了後に materialized view が作成されるため、ソーステーブルに挿入された行を取りこぼす可能性があります。さらに、この populate はすべてのデータを対象に実行されるため、大規模なデータセットでは中断やメモリ制限の影響を受けやすくなります。タイムスタンプまたは単調増加するカラムを利用できる場合
- 近い将来の任意の時刻より後の行だけを対象とするフィルターを付けて、materialized view を作成します。
INSERT INTO SELECTクエリを実行し、ビューの集計クエリを使ってソーステーブルから読み取り、その結果を materialized view のターゲットテーブルに挿入します。
SELECT 句を変更し、近い将来の任意の時刻より後の行のみを対象とするフィルターを含めます。この例では、2024-12-17 09:00:00 が数分後の時刻であると仮定しています。
INSERT INTO SELECT で結果をビューのターゲットテーブルに insert することです。例えば、上記のビューでは次のとおりです。
上記の例では、ターゲットテーブルに SummingMergeTree を使用しています。この場合は、元の集計クエリをそのまま使えます。AggregatingMergeTree を利用する、より複雑なユースケースでは、集計に
-State 関数を使用します。この例については、こちらのインテグレーションガイドを参照してください。pypi_downloads_per_day_v2) を作成してそこに insert し、生成されたパーティションを pypi_downloads_per_day に attach することで、このプロセスの耐障害性を高められます。
materialized view のクエリは、より複雑になりがちで (そうでなければ、そもそもユーザーは view を使わないことがほとんどです!) 、リソースも消費します。まれに、そのクエリが必要とするリソースがサーバーの能力を上回ることもあります。これは、ClickHouse の materialized view が持つ利点の 1 つをよく表しています。materialized view はインクリメンタルであり、データセット全体を一度に処理する必要がありません。
この場合、ユーザーにはいくつかの選択肢があります:
- クエリを変更して範囲をバックフィルします。たとえば
WHERE timestamp BETWEEN 2024-12-17 08:00:00 AND 2024-12-17 09:00:00、WHERE timestamp BETWEEN 2024-12-17 07:00:00 AND 2024-12-17 08:00:00などです。 - materialized view を埋めるには、Null table engine を使用します。これにより、materialized view の一般的なインクリメンタルな投入を再現でき、データのブロック (サイズは設定可能) ごとにそのクエリが実行されます。
materialized view へのデータ投入に Null table engine を使用する
/dev/null のようなものです) 。一見すると矛盾しているように思えますが、この table engine に挿入されたデータに対しても、materialized view は引き続き実行されます。これにより、元のデータを永続化せずに materialized view を構築できるため、I/O とそれに伴うストレージを回避できます。
重要なのは、この table engine に関連付けられた materialized view は、挿入時にデータのブロック単位で引き続き実行され、その結果をターゲットテーブルに送ることです。これらのブロックのサイズは設定可能です。ブロックが大きいほど効率が上がり (処理も速くなり) 得る一方で、消費するリソースも増えます (主にメモリ) 。この table engine を使えば、materialized view をインクリメンタルに、つまり 1 回に 1 ブロックずつ構築できるため、集約全体をメモリ上に保持する必要を避けられます。
次の例を考えてみましょう。
materialized view の構築に使用する行を受け取るために、Null テーブル pypi_v2, を作成します。必要なカラムだけにスキーマを絞っている点に注目してください。この materialized view は、このテーブルに挿入された行に対して集約を実行し (1 ブロックずつ) 、その結果をターゲットテーブル pypi_downloads_per_day に送ります。
ここではターゲットテーブルとして
pypi_downloads_per_day を使用しています。さらに耐障害性を高めるために、ユーザーは複製テーブル pypi_downloads_per_day_v2 を作成し、前の例で示したように、これをビューのターゲットテーブルとして使用できます。挿入の完了後、pypi_downloads_per_day_v2 内のパーティションを pypi_downloads_per_day. に移動できます。これにより、たとえばメモリの問題やサーバーの中断によって挿入が失敗した場合でも復旧できるようになります。つまり、pypi_downloads_per_day_v2 を単に TRUNCATE し、設定を調整して、再試行すればよいということです。materialized view を投入するには、pypi. からバックフィル対象の関連データを pypi_v2 に挿入するだけです。
639.47 MiB です。
パフォーマンスとリソースのチューニング
- 読み取り並列度 - 読み取りに使用するスレッド数。
max_threadsで制御されます。ClickHouse Cloud では、これはインスタンスサイズによって決まり、デフォルトでは vCPU 数が使用されます。この値を増やすと、メモリ使用量は増えますが、読み取り性能が向上する場合があります。 - 挿入の並列度 - 挿入処理に使用される挿入スレッド数です。
max_insert_threadsで制御します。注: この値の上限はmax_threadsであるため、実効的な挿入の並列度はmin(max_insert_threads, max_threads)になります。ClickHouse Cloud では、これはインスタンスのサイズによって決まり (2〜4の範囲) 、OSS では 1 に設定されています。この値を増やすと、メモリ使用量は増えますが、パフォーマンスが向上する場合があります。 - 挿入ブロックサイズ - データはループ内で取得・解析され、パーティションキーに基づいてメモリ内の挿入ブロックにまとめられます。これらのブロックはソート、最適化、圧縮された後、新しいデータパーツとしてストレージに書き込まれます。挿入ブロックのサイズは、設定
min_insert_block_size_rowsとmin_insert_block_size_bytes(非圧縮) で制御され、メモリ使用量とディスク I/O に影響します。ブロックが大きいほど使用メモリは増えますが、作成されるパーツは少なくなり、I/O とバックグラウンドマージが減少します。これらの設定は最小しきい値を表しており、いずれか一方に先に達するとフラッシュが実行されます。 - materialized view のブロックサイズ - メインの insert に関する前述の仕組みに加え、materialized view に insert する前にも、より効率よく処理できるようブロックがまとめられます。これらのブロックのサイズは、設定
min_insert_block_size_bytes_for_materialized_viewsおよびmin_insert_block_size_rows_for_materialized_viewsによって決まります。ブロックを大きくすると、メモリ使用量は増える一方で、処理効率は向上します。デフォルトでは、これらの設定はそれぞれ、ソーステーブルのテーブル設定min_insert_block_size_rowsおよびmin_insert_block_size_bytesの値に戻ります。
単純な INSERT SELECT クエリ向けのヒント: 複雑な変換を伴わないシンプルな
INSERT INTO t1 SELECT * FROM t2 クエリでは、optimize_trivial_insert_select=1 を有効にすることを検討してください。この設定 (バージョン 24.7 以降はデフォルトで無効) は、SELECT の並列度を max_insert_threads に自動的に合わせることで、リソース使用量と生成されるパーツ数を削減します。これは、テーブル間で大量のデータを移行する場合に特に有効です。min_insert_block_size_bytes_for_materialized_views および min_insert_block_size_rows_for_materialized_views を変更する必要はありません。これらを変更する場合は、min_insert_block_size_rows および min_insert_block_size_bytes に関して説明したベストプラクティスと同様の方法を使用してください。
メモリ使用量を最小化したい場合は、これらの設定を調整してみてください。ただし、パフォーマンスは必ず低下します。先ほどのクエリを使用した例を以下に示します。
max_insert_threads を1に下げることで、メモリのオーバーヘッドを削減できます。
max_threads の設定を1に減らすことで、メモリ使用量をさらに抑えることができます。
min_insert_block_size_rows を 0 に設定し (block サイズを決める要因から外し) 、min_insert_block_size_bytes を 10485760 (10MiB) に設定することで、メモリ使用量をさらに削減できます。
タイムスタンプまたは単調増加するカラムがない場合
- メインテーブルへの insert を一時停止します。
CREATE AS構文を使用して、メインのターゲットテーブルの複製を作成します。ALTER TABLE ATTACHを使用して、元のターゲットテーブルのパーティションを複製にアタッチします。注: この attach 操作は、前述の move とは異なります。ハードリンクを利用しますが、元のテーブルのデータは保持されます。- 新しい materialized view を作成します。
- insert を再開します。注: 更新されるのはターゲットテーブルのみで、複製は更新されません。複製は元のデータのみを参照します。
- タイムスタンプがあるデータに対して上で使用したのと同じプロセスを適用し、複製テーブルをソースとして materialized view をバックフィルします。
pypi_downloads_per_day を使った次の例を見てみましょう (ここではタイムスタンプは使えないと仮定します) 。
INSERT INTO SELECT アプローチを使って pypi_downloads_per_day をバックフィルします。これに加えて、上記で説明した Null テーブルのアプローチでさらに強化することもでき、必要に応じて duplicate table を使用すると耐障害性を高められます。
この操作では inserts を一時停止する必要がありますが、中間処理は通常すばやく完了するため、データの中断は最小限に抑えられます。