メインコンテンツへスキップ
ClickHouse を初めて使う場合でも、既存のデプロイメントを担当している場合でも、テーブルに履歴データをバックフィルする必要は必ず出てきます。これが比較的簡単に済む場合もありますが、materialized view へのデータ投入も必要になると、複雑さが増すことがあります。このガイドでは、この作業を進めるためのいくつかの手順を説明します。これらは、それぞれのユースケースに応じて適用できます。
このガイドでは、読者がすでに インクリメンタルmaterialized view の概念と、s3 や gcs などのテーブル関数を使ったデータロード に慣れていることを前提としています。また、オブジェクトストレージからの insert パフォーマンスの最適化 に関するガイドを読むこともお勧めします。そこで紹介しているアドバイスは、このガイド全体を通して登場する insert にも適用できます。

サンプルデータセット

このガイドでは、PyPI のデータセットを使用します。このデータセットの各行は、pip などのツールを使った Python パッケージのダウンロード 1 件を表しています。 たとえば、このサブセットには 2024-12-17 の 1 日分のデータが含まれており、https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/ で公開されています。次のようにクエリできます。
SELECT count()
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/*.parquet')
┌────count()─┐
│ 2039988137 │ -- 約20.4億
└────────────┘

1 row in set. Elapsed: 32.726 sec. Processed 2.04 billion rows, 170.05 KB (62.34 million rows/s., 5.20 KB/s.)
Peak memory usage: 239.50 MiB.
このバケットの完全なデータセットには、320 GB を超える Parquet ファイルが含まれています。以下の例では、glob パターンを使って意図的にその一部のみを対象にしています。 ユーザーは、この日付以降のデータを、たとえば Kafka やオブジェクトストレージからストリームとして取り込んでいるものとします。このデータのスキーマを以下に示します。
DESCRIBE TABLE s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/*.parquet')
FORMAT PrettyCompactNoEscapesMonoBlock
SETTINGS describe_compact_output = 1
┌─name───────────────┬─type────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ timestamp │ Nullable(DateTime64(6))                                                                                                                 │
│ country_code       │ Nullable(String)                                                                                                                        │
│ url │ Nullable(String)                                                                                                                        │
│ project            │ Nullable(String)                                                                                                                        │
│ file │ Tuple(filename Nullable(String), project Nullable(String), version Nullable(String), type Nullable(String))                             │
│ installer          │ Tuple(name Nullable(String), version Nullable(String))                                                                                  │
│ python             │ Nullable(String)                                                                                                                        │
│ implementation     │ Tuple(name Nullable(String), version Nullable(String))                                                                                  │
│ distro             │ Tuple(name Nullable(String), version Nullable(String), id Nullable(String), libc Tuple(lib Nullable(String), version Nullable(String))) │
│ system │ Tuple(name Nullable(String), release Nullable(String))                                                                                  │
│ cpu                │ Nullable(String)                                                                                                                        │
│ openssl_version    │ Nullable(String)                                                                                                                        │
│ setuptools_version │ Nullable(String)                                                                                                                        │
│ rustc_version      │ Nullable(String)                                                                                                                        │
│ tls_protocol       │ Nullable(String)                                                                                                                        │
│ tls_cipher         │ Nullable(String)                                                                                                                        │
└────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
1 兆行を超える完全な PyPI データセットは、公開デモ環境 clickpy.clickhouse.com で利用できます。このデータセットの詳細 (パフォーマンス向上のためにデモで materialized view をどのように活用しているかや、データが毎日どのように投入されているかを含む) については、こちらを参照してください。

バックフィルのシナリオ

バックフィルは通常、ある時点以降のデータストリームを取り込んでいる場合に必要になります。このデータは、挿入時にブロック単位でトリガーされるインクリメンタルmaterialized viewによって、ClickHouseテーブルに挿入されています。これらのビューは、挿入前にデータを変換したり、集計を計算して、その結果を後続のアプリケーションで利用するためにターゲットテーブルへ送ったりすることがあります。 ここでは、次のシナリオを扱います。
  1. 既存のデータインジェストがある状態でのデータのバックフィル - 新しいデータはすでに読み込まれており、履歴データをバックフィルする必要があります。この履歴データはすでに特定されています。
  2. 既存のテーブルへのmaterialized viewの追加 - 履歴データがすでに投入され、データのストリーミングも進行している構成に、新しいmaterialized viewを追加する必要があります。
データはオブジェクトストレージからバックフィルすることを前提としています。いずれの場合も、データの挿入を停止しないことを目指します。 履歴データはオブジェクトストレージからバックフィルすることを推奨します。可能であれば、最適な読み取り性能と圧縮 (ネットワーク転送量の削減) のため、データはParquetにエクスポートしてください。通常は約150MBのファイルサイズが推奨されますが、ClickHouseは70を超えるファイルフォーマットをサポートしており、あらゆるサイズのファイルを処理できます。

重複テーブルとビューの使用

すべてのシナリオにおいて、「duplicate tables and views」という概念を活用します。これらのテーブルとビューは、ライブストリーミングデータに使用されるものの複製であり、障害発生時に容易にリカバリできる手段を確保しながら、バックフィルを独立して実行できるようにします。例として、以下のメインの pypi テーブルとmaterialized viewを示します。これはPythonプロジェクトごとのダウンロード数を計算するものです。
CREATE TABLE pypi
(
    `timestamp` DateTime,
    `country_code` LowCardinality(String),
    `project` String,
    `type` LowCardinality(String),
    `installer` LowCardinality(String),
    `python_minor` LowCardinality(String),
    `system` LowCardinality(String),
    `on` String
)
ENGINE = MergeTree
ORDER BY (project, timestamp)

CREATE TABLE pypi_downloads
(
    `project` String,
    `count` Int64
)
ENGINE = SummingMergeTree
ORDER BY project

CREATE MATERIALIZED VIEW pypi_downloads_mv TO pypi_downloads
AS SELECT
 project,
    count() AS count
FROM pypi
GROUP BY project
メインテーブルと関連ビューに、データのサブセットを投入します。
INSERT INTO pypi SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{000..100}.parquet')
0 rows in set. Elapsed: 15.702 sec. Processed 41.23 million rows, 3.94 GB (2.63 million rows/s., 251.01 MB/s.)
Peak memory usage: 977.49 MiB.
SELECT count() FROM pypi
┌──count()─┐
│ 20612750 │ -- 2061万
└──────────┘

1 row in set. Elapsed: 0.004 sec.
SELECT sum(count)
FROM pypi_downloads
┌─sum(count)─┐
│   20612750 │ -- 2061万
└────────────┘

1 row in set. Elapsed: 0.006 sec. Processed 96.15 thousand rows, 769.23 KB (16.53 million rows/s., 132.26 MB/s.)
Peak memory usage: 682.38 KiB.
別のサブセット {101..200} をロードする場合を考えます。pypi に直接insertすることもできますが、duplicate tableを作成することで、このバックフィルを他の処理から切り離して実行できます。 バックフィルが失敗した場合でも、メインテーブルへの影響はなく、重複テーブルをtruncateして処理をやり直すだけで済みます。 これらのビューの新しいコピーを作成するには、接尾辞 _v2 を付けた CREATE TABLE AS 句を使用します。
CREATE TABLE pypi_v2 AS pypi

CREATE TABLE pypi_downloads_v2 AS pypi_downloads

CREATE MATERIALIZED VIEW pypi_downloads_mv_v2 TO pypi_downloads_v2
AS SELECT
 project,
    count() AS count
FROM pypi_v2
GROUP BY project
これに2番目のほぼ同じサイズのサブセットを投入し、正常にロードされたことを確認します。
INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{101..200}.parquet')
0 rows in set. Elapsed: 17.545 sec. Processed 40.80 million rows, 3.90 GB (2.33 million rows/s., 222.29 MB/s.)
Peak memory usage: 991.50 MiB.
SELECT count()
FROM pypi_v2
┌──count()─┐
│ 20400020 │ -- 2040万
└──────────┘

1 row in set. Elapsed: 0.004 sec.
SELECT sum(count)
FROM pypi_downloads_v2
┌─sum(count)─┐
│   20400020 │ -- 2,040万
└────────────┘

1 row in set. Elapsed: 0.006 sec. Processed 95.49 thousand rows, 763.90 KB (14.81 million rows/s., 118.45 MB/s.)
Peak memory usage: 688.77 KiB.
この2回目のロード中にどこかの時点で障害が発生しても、pypi_v2pypi_downloads_v2truncate して、データロードをやり直せば済みます。 データロードが完了したら、ALTER TABLE MOVE PARTITION 句を使って、複製テーブルからメインテーブルへデータを移動できます。
ALTER TABLE pypi_v2 MOVE PARTITION () TO pypi
0 rows in set. Elapsed: 1.401 sec.
ALTER TABLE pypi_downloads_v2 MOVE PARTITION () TO pypi_downloads
0 行が返されました。経過時間: 0.389 秒。
パーティション名上記の MOVE PARTITION 呼び出しでは、パーティション名 () を使用しています。これは、このテーブル (パーティション化されていないテーブル) に存在する唯一のパーティションを表します。パーティション化されたテーブルの場合は、各パーティションに対して 1 回ずつ、複数回 MOVE PARTITION を呼び出す必要があります。現在のパーティション名は、system.parts テーブルから確認できます。たとえば、SELECT DISTINCT partition FROM system.parts WHERE (table = 'pypi_v2') を実行します。
これで、pypipypi_downloads に完全なデータが含まれていることを確認できます。pypi_downloads_v2pypi_v2 は安全に削除できます。
SELECT count()
FROM pypi
┌──count()─┐
│ 41012770 │ -- 4101万
└──────────┘

1行セット。経過時間: 0.003秒。
SELECT sum(count)
FROM pypi_downloads
┌─sum(count)─┐
│   41012770 │ -- 4101万
└────────────┘

1 row in set. Elapsed: 0.007 sec. Processed 191.64 thousand rows, 1.53 MB (27.34 million rows/s., 218.74 MB/s.)
SELECT count()
FROM pypi_v2
重要なのは、MOVE PARTITION 操作は軽量であり (ハードリンクを利用) 、しかもアトミックであるという点です。つまり、中間状態を挟まず、失敗するか成功するかのどちらかになります。 以下のバックフィルのシナリオでは、このプロセスを多用します。 このプロセスでは、ユーザーが各挿入操作のサイズを選ぶ必要がある点に注意してください。 挿入サイズが大きい、つまり行数が多いほど、必要な MOVE PARTITION 操作の回数は少なくなります。ただし、これは挿入が失敗した場合 (たとえばネットワークの中断など) に復旧するコストとのバランスを取る必要があります。リスクを減らすために、ファイルのバッチ化をこのプロセスに組み合わせることもできます。これは、WHERE timestamp BETWEEN 2024-12-17 09:00:00 AND 2024-12-17 10:00:00 のような範囲クエリ、または glob パターンのいずれでも実行できます。たとえば、
INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{101..200}.parquet')
INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{201..300}.parquet')
INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{301..400}.parquet')
--すべてのファイルの読み込みが完了するか、MOVE PARTITION が実行されるまで繰り返す
ClickPipes では、オブジェクトストレージ からデータをロードする際にこの方法が使われ、ターゲットテーブルとその materialized view の複製が自動的に作成されるため、ユーザーが上記の手順を実行する必要はありません。さらに、異なるサブセットをそれぞれ処理する複数のワーカースレッドを使用し (glob パターンを使用) 、各スレッドが独自の複製テーブルを持つことで、データを exactly-once セマンティクスで高速にロードできます。詳しくは、このブログをご覧ください。

シナリオ 1: 既存のデータインジェストを維持したままデータをバックフィルする

このシナリオでは、バックフィル対象のデータが分離されたバケット内にないため、フィルタリングが必要になると想定します。すでにデータの挿入は進行しており、履歴データをバックフィルすべき起点となるタイムスタンプ、または単調増加するカラムを特定できます。 このプロセスは、次の手順で進めます。
  1. チェックポイントを特定します。これは、履歴データを復元する必要がある起点のタイムスタンプ、またはカラム値です。
  2. メインテーブルと、materialized view 用のターゲットテーブルの複製を作成します。
  3. 手順 (2) で作成したターゲットテーブルを参照する materialized view のコピーを作成します。
  4. 手順 (2) で作成した複製メインテーブルに挿入します。
  5. 複製テーブルからすべてのパーティションを元のテーブルに移動し、複製テーブルを削除します。
たとえば、PyPI データですでにデータがロードされているとします。この場合、最小のタイムスタンプを特定できるため、それを「チェックポイント」とできます。
SELECT min(timestamp)
FROM pypi
┌──────min(timestamp)─┐
│ 2024-12-17 09:00:00 │
└─────────────────────┘

1 row in set. Elapsed: 0.163 sec. Processed 1.34 billion rows, 5.37 GB (8.24 billion rows/s., 32.96 GB/s.)
Peak memory usage: 227.84 MiB.
上記から、2024-12-17 09:00:00 より前のデータをロードする必要があることがわかります。先ほどの手順と同様に、複製したテーブルとビューを作成し、タイムスタンプでフィルタして必要な部分集合をロードします。
CREATE TABLE pypi_v2 AS pypi

CREATE TABLE pypi_downloads_v2 AS pypi_downloads

CREATE MATERIALIZED VIEW pypi_downloads_mv_v2 TO pypi_downloads_v2
AS SELECT project, count() AS count
FROM pypi_v2
GROUP BY project

INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-*.parquet')
WHERE timestamp < '2024-12-17 09:00:00'
0 rows in set. Elapsed: 500.152 sec. Processed 2.74 billion rows, 364.40 GB (5.47 million rows/s., 728.59 MB/s.)
Parquet の timestamp カラムに対するフィルタリングは非常に効率的です。ClickHouse は、読み込むデータ範囲全体を特定するために timestamp カラムのみを読み取るため、ネットワークトラフィックを最小限に抑えられます。min-max などの Parquet インデックスも、ClickHouse のクエリエンジンで活用できます。
この挿入処理が完了したら、関連するパーティションを移動できます。
ALTER TABLE pypi_v2 MOVE PARTITION () TO pypi

ALTER TABLE pypi_downloads_v2 MOVE PARTITION () TO pypi_downloads
履歴データが独立したバケットにある場合、上記の時間フィルターは不要です。時間カラムまたは単調増加するカラムを利用できない場合は、履歴データを分離してください。
ClickHouse Cloud では ClickPipes を使用してくださいClickHouse Cloud を使用している場合、データを専用のバケットに分離できるなら (つまりフィルターが不要なら) 、履歴バックアップの復元には ClickPipes を使用してください。ClickPipes は、複数のワーカーでロードを並列化して読み込み時間を短縮できるだけでなく、上記のプロセスを自動化し、メインテーブルと materialized view の両方に対して複製テーブルを作成します。

シナリオ 2: 既存のテーブルへの materialized view の追加

すでに大量のデータが格納され、なおかつデータの挿入が継続している環境に、新しい materialized view を追加する必要が生じることは珍しくありません。この場合、ストリーム内のある時点を特定できる timestamp や単調増加するカラムがあると便利で、データのインジェストを停止せずに済みます。以下の例では、インジェストの停止を避ける方法を優先し、両方のケースを想定しています。
POPULATE は避けてくださいPOPULATE コマンドを使って materialized view をバックフィルすることは、インジェストを停止した小規模なデータセットを除き、推奨していません。この演算子では、populate ハッシュの完了後に materialized view が作成されるため、ソーステーブルに挿入された行を取りこぼす可能性があります。さらに、この populate はすべてのデータを対象に実行されるため、大規模なデータセットでは中断やメモリ制限の影響を受けやすくなります。

タイムスタンプまたは単調増加するカラムを利用できる場合

この場合、新しい materialized view には、将来の任意の時点より後の行だけに制限するフィルターを含めることを推奨します。その後、この materialized view は、その時点を起点にメインテーブルの履歴データからバックフィルできます。バックフィルの方法は、データサイズと関連するクエリの複雑さによって異なります。 最もシンプルな方法は、次の手順です。
  1. 近い将来の任意の時刻より後の行だけを対象とするフィルターを付けて、materialized view を作成します。
  2. INSERT INTO SELECT クエリを実行し、ビューの集計クエリを使ってソーステーブルから読み取り、その結果を materialized view のターゲットテーブルに挿入します。
さらに、これを拡張して手順 (2) でデータの一部だけを対象にしたり、materialized view 用に複製したターゲットテーブルを使用したりすることもできます (挿入完了後に元のテーブルへパーティションを Attach します) 。これにより、障害発生後の復旧が容易になります。 以下の materialized view を考えてみましょう。これは、1 時間ごとに最も人気のあるプロジェクトを計算します。
CREATE TABLE pypi_downloads_per_day
(
    `hour` DateTime,
    `project` String,
    `count` Int64
)
ENGINE = SummingMergeTree
ORDER BY (project, hour)

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi
GROUP BY
    hour,
 project
ターゲットテーブルは追加できますが、materialized view を追加する前に、その SELECT 句を変更し、近い将来の任意の時刻より後の行のみを対象とするフィルターを含めます。この例では、2024-12-17 09:00:00 が数分後の時刻であると仮定しています。
CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) AS hour,
 project, count() AS count
FROM pypi WHERE timestamp >= '2024-12-17 09:00:00'
GROUP BY hour, project
このビューを追加したら、このデータより前の期間について、materialized view のデータをすべて バックフィルできます。 これを行う最も簡単な方法は、最近追加されたデータを無視する filter を指定してメイン table に対し materialized view のクエリをそのまま実行し、INSERT INTO SELECT で結果をビューのターゲットテーブルに insert することです。例えば、上記のビューでは次のとおりです。
INSERT INTO pypi_downloads_per_day SELECT
 toStartOfHour(timestamp) AS hour,
 project,
    count() AS count
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
GROUP BY
    hour,
 project
Ok.

0 rows in set. Elapsed: 2.830 sec. Processed 798.89 million rows, 17.40 GB (282.28 million rows/s., 6.15 GB/s.)
Peak memory usage: 543.71 MiB.
上記の例では、ターゲットテーブルに SummingMergeTree を使用しています。この場合は、元の集計クエリをそのまま使えます。AggregatingMergeTree を利用する、より複雑なユースケースでは、集計に -State 関数を使用します。この例については、こちらのインテグレーションガイドを参照してください。
このケースでは、比較的軽量な集計であり、3 秒未満で完了し、使用メモリも 600MiB 未満です。より複雑な集計や長時間実行される集計では、前述の複製テーブルのアプローチ、つまりシャドウのターゲットテーブル (たとえば pypi_downloads_per_day_v2) を作成してそこに insert し、生成されたパーティションを pypi_downloads_per_day に attach することで、このプロセスの耐障害性を高められます。 materialized view のクエリは、より複雑になりがちで (そうでなければ、そもそもユーザーは view を使わないことがほとんどです!) 、リソースも消費します。まれに、そのクエリが必要とするリソースがサーバーの能力を上回ることもあります。これは、ClickHouse の materialized view が持つ利点の 1 つをよく表しています。materialized view はインクリメンタルであり、データセット全体を一度に処理する必要がありません。 この場合、ユーザーにはいくつかの選択肢があります:
  1. クエリを変更して範囲をバックフィルします。たとえば WHERE timestamp BETWEEN 2024-12-17 08:00:00 AND 2024-12-17 09:00:00WHERE timestamp BETWEEN 2024-12-17 07:00:00 AND 2024-12-17 08:00:00 などです。
  2. materialized view を埋めるには、Null table engine を使用します。これにより、materialized view の一般的なインクリメンタルな投入を再現でき、データのブロック (サイズは設定可能) ごとにそのクエリが実行されます。
(1) は最も単純な方法であり、多くの場合これで十分です。簡潔にするため、例は省略します。 以下では (2) をさらに詳しく説明します。

materialized view へのデータ投入に Null table engine を使用する

Null table engine は、データを永続化しないストレージエンジンを提供します (table engine の世界における /dev/null のようなものです) 。一見すると矛盾しているように思えますが、この table engine に挿入されたデータに対しても、materialized view は引き続き実行されます。これにより、元のデータを永続化せずに materialized view を構築できるため、I/O とそれに伴うストレージを回避できます。 重要なのは、この table engine に関連付けられた materialized view は、挿入時にデータのブロック単位で引き続き実行され、その結果をターゲットテーブルに送ることです。これらのブロックのサイズは設定可能です。ブロックが大きいほど効率が上がり (処理も速くなり) 得る一方で、消費するリソースも増えます (主にメモリ) 。この table engine を使えば、materialized view をインクリメンタルに、つまり 1 回に 1 ブロックずつ構築できるため、集約全体をメモリ上に保持する必要を避けられます。
次の例を考えてみましょう。
CREATE TABLE pypi_v2
(
    `timestamp` DateTime,
    `project` String
)
ENGINE = Null

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv_v2 TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi_v2
GROUP BY
    hour,
 project
ここでは、materialized view の構築に使用する行を受け取るために、Null テーブル pypi_v2, を作成します。必要なカラムだけにスキーマを絞っている点に注目してください。この materialized view は、このテーブルに挿入された行に対して集約を実行し (1 ブロックずつ) 、その結果をターゲットテーブル pypi_downloads_per_day に送ります。
ここではターゲットテーブルとして pypi_downloads_per_day を使用しています。さらに耐障害性を高めるために、ユーザーは複製テーブル pypi_downloads_per_day_v2 を作成し、前の例で示したように、これをビューのターゲットテーブルとして使用できます。挿入の完了後、pypi_downloads_per_day_v2 内のパーティションを pypi_downloads_per_day. に移動できます。これにより、たとえばメモリの問題やサーバーの中断によって挿入が失敗した場合でも復旧できるようになります。つまり、pypi_downloads_per_day_v2 を単に TRUNCATE し、設定を調整して、再試行すればよいということです。
この materialized view を投入するには、pypi. からバックフィル対象の関連データを pypi_v2 に挿入するだけです。
INSERT INTO pypi_v2 SELECT timestamp, project FROM pypi WHERE timestamp < '2024-12-17 09:00:00'
0 rows in set. Elapsed: 27.325 sec. Processed 1.50 billion rows, 33.48 GB (54.73 million rows/s., 1.23 GB/s.)
Peak memory usage: 639.47 MiB.
ここでのメモリ使用量は 639.47 MiB です。
パフォーマンスとリソースのチューニング
上記のシナリオにおけるパフォーマンスとリソース使用量は、いくつかの要因によって決まります。チューニングを行う前に、S3のインサートおよび読み取りパフォーマンス最適化ガイド読み取りへのスレッドの使用セクションに詳しく記載されているインサートの仕組みを理解しておくことを推奨します。概要は以下のとおりです。
  • 読み取り並列度 - 読み取りに使用するスレッド数。max_threads で制御されます。ClickHouse Cloud では、これはインスタンスサイズによって決まり、デフォルトでは vCPU 数が使用されます。この値を増やすと、メモリ使用量は増えますが、読み取り性能が向上する場合があります。
  • 挿入の並列度 - 挿入処理に使用される挿入スレッド数です。max_insert_threads で制御します。: この値の上限は max_threads であるため、実効的な挿入の並列度は min(max_insert_threads, max_threads) になります。ClickHouse Cloud では、これはインスタンスのサイズによって決まり (2〜4の範囲) 、OSS では 1 に設定されています。この値を増やすと、メモリ使用量は増えますが、パフォーマンスが向上する場合があります。
  • 挿入ブロックサイズ - データはループ内で取得・解析され、パーティションキーに基づいてメモリ内の挿入ブロックにまとめられます。これらのブロックはソート、最適化、圧縮された後、新しいデータパーツとしてストレージに書き込まれます。挿入ブロックのサイズは、設定 min_insert_block_size_rowsmin_insert_block_size_bytes (非圧縮) で制御され、メモリ使用量とディスク I/O に影響します。ブロックが大きいほど使用メモリは増えますが、作成されるパーツは少なくなり、I/O とバックグラウンドマージが減少します。これらの設定は最小しきい値を表しており、いずれか一方に先に達するとフラッシュが実行されます。
  • materialized view のブロックサイズ - メインの insert に関する前述の仕組みに加え、materialized view に insert する前にも、より効率よく処理できるようブロックがまとめられます。これらのブロックのサイズは、設定 min_insert_block_size_bytes_for_materialized_views および min_insert_block_size_rows_for_materialized_views によって決まります。ブロックを大きくすると、メモリ使用量は増える一方で、処理効率は向上します。デフォルトでは、これらの設定はそれぞれ、ソーステーブルのテーブル設定 min_insert_block_size_rows および min_insert_block_size_bytes の値に戻ります。
単純な INSERT SELECT クエリ向けのヒント: 複雑な変換を伴わないシンプルな INSERT INTO t1 SELECT * FROM t2 クエリでは、optimize_trivial_insert_select=1 を有効にすることを検討してください。この設定 (バージョン 24.7 以降はデフォルトで無効) は、SELECT の並列度を max_insert_threads に自動的に合わせることで、リソース使用量と生成されるパーツ数を削減します。これは、テーブル間で大量のデータを移行する場合に特に有効です。
パフォーマンスを向上させるには、Optimizing for S3 Insert and Read Performance guideTuning Threads and Block Size for Inserts セクションに記載されているガイドラインに従ってください。ほとんどの場合、パフォーマンス向上のために min_insert_block_size_bytes_for_materialized_views および min_insert_block_size_rows_for_materialized_views を変更する必要はありません。これらを変更する場合は、min_insert_block_size_rows および min_insert_block_size_bytes に関して説明したベストプラクティスと同様の方法を使用してください。 メモリ使用量を最小化したい場合は、これらの設定を調整してみてください。ただし、パフォーマンスは必ず低下します。先ほどのクエリを使用した例を以下に示します。 max_insert_threads を1に下げることで、メモリのオーバーヘッドを削減できます。
INSERT INTO pypi_v2
SELECT
    timestamp,
 project
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
SETTINGS max_insert_threads = 1
0 rows in set. Elapsed: 27.752 sec. Processed 1.50 billion rows, 33.48 GB (53.89 million rows/s., 1.21 GB/s.)
Peak memory usage: 506.78 MiB.
max_threads の設定を1に減らすことで、メモリ使用量をさらに抑えることができます。
INSERT INTO pypi_v2
SELECT timestamp, project
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
SETTINGS max_insert_threads = 1, max_threads = 1
Ok.

0 rows in set. Elapsed: 43.907 sec. Processed 1.50 billion rows, 33.48 GB (34.06 million rows/s., 762.54 MB/s.)
Peak memory usage: 272.53 MiB.
最後に、min_insert_block_size_rows を 0 に設定し (block サイズを決める要因から外し) 、min_insert_block_size_bytes を 10485760 (10MiB) に設定することで、メモリ使用量をさらに削減できます。
INSERT INTO pypi_v2
SELECT
    timestamp,
 project
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
SETTINGS max_insert_threads = 1, max_threads = 1, min_insert_block_size_rows = 0, min_insert_block_size_bytes = 10485760
0 rows in set. Elapsed: 43.293 sec. Processed 1.50 billion rows, 33.48 GB (34.54 million rows/s., 773.36 MB/s.)
Peak memory usage: 218.64 MiB.
最後に、ブロックサイズを小さくすると、作成されるパーツが増え、マージ負荷も高くなる点に注意してください。こちらで説明しているように、これらの設定は慎重に変更する必要があります。

タイムスタンプまたは単調増加するカラムがない場合

上記のプロセスは、ユーザーがタイムスタンプまたは単調増加するカラムを持っていることを前提としています。しかし、これらを利用できない場合もあります。その場合は、前述の手順の多くを活用しつつ、データの取り込みを一時停止する必要がある次のプロセスを推奨します。
  1. メインテーブルへの insert を一時停止します。
  2. CREATE AS構文を使用して、メインのターゲットテーブルの複製を作成します。
  3. ALTER TABLE ATTACH を使用して、元のターゲットテーブルのパーティションを複製にアタッチします。注: この attach 操作は、前述の move とは異なります。ハードリンクを利用しますが、元のテーブルのデータは保持されます。
  4. 新しい materialized view を作成します。
  5. insert を再開します。注: 更新されるのはターゲットテーブルのみで、複製は更新されません。複製は元のデータのみを参照します。
  6. タイムスタンプがあるデータに対して上で使用したのと同じプロセスを適用し、複製テーブルをソースとして materialized view をバックフィルします。
PyPI と、前述の新しい materialized view pypi_downloads_per_day を使った次の例を見てみましょう (ここではタイムスタンプは使えないと仮定します) 。
SELECT count() FROM pypi
┌────count()─┐
│ 2039988137 │ -- 20.4億
└────────────┘

1 行 (set内). 経過時間: 0.003 秒.
-- (1) インサートを一時停止する
-- (2) ターゲットテーブルの複製を作成する

CREATE TABLE pypi_v2 AS pypi

SELECT count() FROM pypi_v2
┌────count()─┐
│ 2039988137 │ -- 20.4億
└────────────┘

1 row in set. Elapsed: 0.004 sec.
-- (3) 元のターゲットテーブルから複製テーブルにパーティションをアタッチする。

ALTER TABLE pypi_v2
 (ATTACH PARTITION tuple() FROM pypi)

-- (4) 新しい materialized view を作成する。

CREATE TABLE pypi_downloads_per_day
(
    `hour` DateTime,
    `project` String,
    `count` Int64
)
ENGINE = SummingMergeTree
ORDER BY (project, hour)

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi
GROUP BY
    hour,
 project

-- (4) インサートを再開する。ここでは1行を挿入して動作を確認する。

INSERT INTO pypi SELECT *
FROM pypi
LIMIT 1

SELECT count() FROM pypi
┌────count()─┐
│ 2039988138 │ -- 20.4億
└────────────┘

1 row in set. Elapsed: 0.003 sec.
-- pypi_v2 が以前と同じ行数を保持していることに注目

SELECT count() FROM pypi_v2
┌────count()─┐
│ 2039988137 │ -- 20億4000万
└────────────┘
-- (5) バックアップ pypi_v2 を使用してビューをバックフィルする

INSERT INTO pypi_downloads_per_day SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi_v2
GROUP BY
    hour,
 project
0 rows in set. Elapsed: 3.719 sec. Processed 2.04 billion rows, 47.15 GB (548.57 million rows/s., 12.68 GB/s.)
DROP TABLE pypi_v2;
最後から 2 番目の手順では、前述したシンプルな INSERT INTO SELECT アプローチを使って pypi_downloads_per_day をバックフィルします。これに加えて、上記で説明した Null テーブルのアプローチでさらに強化することもでき、必要に応じて duplicate table を使用すると耐障害性を高められます。 この操作では inserts を一時停止する必要がありますが、中間処理は通常すばやく完了するため、データの中断は最小限に抑えられます。
最終更新日 2026年6月10日