メインコンテンツへスキップ
このエンジンは、アプリケーションのログファイルをレコードのストリームとして処理できます。 FileLog では、次のことができます。
  • ログファイルを監視する。
  • 監視対象のログファイルに追記された新しいレコードを処理する。

テーブルの作成

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = FileLog('path_to_logs', 'format_name') SETTINGS
    [poll_timeout_ms = 0,]
    [poll_max_batch_size = 0,]
    [max_block_size = 0,]
    [max_threads = 0,]
    [poll_directory_watch_events_backoff_init = 500,]
    [poll_directory_watch_events_backoff_max = 32000,]
    [poll_directory_watch_events_backoff_factor = 2,]
    [handle_error_mode = 'default']
エンジン引数:
  • path_to_logs – サブスクライブするログファイルのパス。ログファイルを含むディレクトリへのパス、または単一のログファイルへのパスを指定できます。なお、ClickHouse で指定できるのは user_files ディレクトリ内のパスのみです。
  • format_name - レコードのフォーマット。FileLog はファイル内の各行を個別のレコードとして処理するため、すべてのデータフォーマットがこれに適しているわけではありません。
パラメータ:
  • poll_timeout_ms - ログファイルから 1 回 poll する際のタイムアウト。デフォルト: stream_poll_timeout_ms
  • poll_max_batch_size — 1 回の poll で取得するレコードの最大数。デフォルト: max_block_size
  • max_block_sizepoll の最大バッチサイズ (レコード数) 。デフォルト: max_insert_block_size
  • max_threads - ファイルを解析する最大スレッド数。デフォルトは 0 で、この場合は max(1, physical_cpu_cores / 4) が使用されます。
  • poll_directory_watch_events_backoff_init - ディレクトリ監視スレッドの初期スリープ値。デフォルト: 500
  • poll_directory_watch_events_backoff_max - ディレクトリ監視スレッドの最大スリープ値。デフォルト: 32000
  • poll_directory_watch_events_backoff_factor - バックオフの進み方。デフォルトは指数関数的です。デフォルト: 2
  • handle_error_mode — FileLog engine のエラー処理方法。設定可能な値: default (メッセージの解析に失敗した場合は例外がスローされます) 、stream (例外メッセージと生のメッセージが仮想カラム _error および _raw_message に保存されます) 。

説明

配信されたレコードは自動的に追跡されるため、ログファイル内の各レコードは一度しかカウントされません。 SELECT はレコードの読み取りにはあまり適していません (デバッグ用途を除く) 。各レコードは一度しか読み取れないためです。より実用的なのは、materialized view を使ってリアルタイムのストリームを作成することです。そのためには、次のようにします。
  1. エンジンを使用して FileLog テーブルを作成し、それをデータストリームとして扱います。
  2. 必要な structure を持つテーブルを作成します。
  3. エンジンからのデータを変換し、あらかじめ作成したテーブルに格納する materialized view を作成します。
MATERIALIZED VIEW をエンジンに接続すると、バックグラウンドでデータの収集が開始されます。これにより、ログファイルからレコードを継続的に受け取り、SELECT を使って必要なフォーマットに変換できます。 1 つの FileLog テーブルには、必要な数だけ materialized view を持たせることができます。これらはテーブルから直接データを読み取るのではなく、新しいレコードを block 単位で受け取ります。そのため、詳細度の異なる複数のテーブルに書き込むことができます (グループ化と aggregation を行うもの/行わないもの) 。 例:
  CREATE TABLE logs (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = FileLog('user_files/my_app/app.log', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() AS total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;
ストリームのデータ受信を停止する、または変換ロジックを変更するには、materialized view をデタッチします:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
ALTER を使用してターゲットテーブルを変更する場合は、ターゲットテーブルとビューのデータに不整合が生じるのを避けるため、materialized view を無効にすることを推奨します。

仮想カラム

  • _filename - ログファイル名。データ型: LowCardinality(String).
  • _offset - ログファイル内のオフセット。データ型: UInt64.
handle_error_mode='stream' の場合に追加される仮想カラム:
  • _raw_record - 正常にパースできなかった生のレコード。データ型: Nullable(String).
  • _error - パース失敗時に発生した例外メッセージ。データ型: Nullable(String).
注: _raw_record_error の仮想カラムに値が入るのは、パース中に例外が発生した場合のみです。メッセージが正常にパースされた場合、これらは常に NULL になります。
最終更新日 2026年6月10日