メインコンテンツへスキップ
ClickHouse Cloudをご利用の場合は、代わりにClickPipesの使用をおすすめします。ClickPipesは、プライベートネットワーク接続、インジェストとクラスターリソースの個別スケーリング、さらにKafkaデータをClickHouseにストリーミングするための包括的な監視を標準でサポートしています。
  • データフローをパブリッシュまたはサブスクライブする。
  • 耐障害性のあるストレージを構成する。
  • 利用可能になったストリームを処理する。

テーブルの作成

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [ALIAS expr1],
    name2 [type2] [ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_security_protocol = '',]
    [kafka_sasl_mechanism = '',]
    [kafka_sasl_username = '',]
    [kafka_sasl_password = '',]
    [kafka_autodetect_client_rack = '',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_consumer_reschedule_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_consumer_acquire_timeout_ms = 30000,]
    [kafka_max_rows_per_message = 1,]
    [kafka_compression_codec = '',]
    [kafka_compression_level = -1];
パラメータ:
  • kafka_broker_list — ブローカーをカンマ区切りで指定したリスト (例: localhost:9092) 。
  • kafka_topic_list — Kafkaトピックのリスト。
  • kafka_group_name — Kafkaコンシューマーのグループ。読み取りオフセットは各グループごとに個別に追跡されます。クラスター内でメッセージが重複しないようにするには、すべてで同じグループ名を使用してください。
  • kafka_format — メッセージのフォーマット。JSONEachRow など、SQL の FORMAT 関数と同じ記法を使用します。詳細は フォーマット セクションを参照してください。
パラメータ:
  • kafka_security_protocol - ブローカーと通信する際に使用するプロトコル。設定可能な値: plaintext, ssl, sasl_plaintext, sasl_ssl
  • kafka_sasl_mechanism - 認証に使用する SASL メカニズム。設定可能な値: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER.
  • kafka_sasl_username - PLAIN および SASL-SCRAM-.. メカニズムで使用する SASL ユーザー名。
  • kafka_sasl_password - PLAIN および SASL-SCRAM-.. メカニズムで使用する SASL パスワード。
  • kafka_schema — フォーマットでスキーマ定義が必要な場合に指定しなければならないパラメータです。たとえば、Cap’n Proto では、スキーマファイルへのパスとルート schema.capnp:Message オブジェクト名が必要です。
  • kafka_schema_registry_skip_bytes — エンベロープヘッダー付きのスキーマレジストリを使用する際に、各メッセージの先頭からスキップするバイト数です (例: 19 バイトのエンベロープを含む AWS Glue Schema Registry) 。範囲: [0, 255]。デフォルト: 0
  • kafka_num_consumers — テーブルごとのコンシューマー数です。1 つのコンシューマーのスループットでは不十分な場合は、コンシューマー数を増やしてください。各パーティションには割り当てられるコンシューマーが 1 つだけのため、コンシューマーの総数はトピック内のパーティション数を超えてはなりません。また、ClickHouse がデプロイされているサーバーの物理コア数を超えないようにしてください。デフォルト: 1
  • kafka_max_block_size — poll での最大バッチサイズ (メッセージ数) 。デフォルト: max_insert_block_size
  • kafka_skip_broken_messages — Kafka メッセージパーサーが、ブロックごとにスキーマと互換性のないメッセージをどこまで許容するかを指定します。kafka_skip_broken_messages = N の場合、エンジンはパースできない Kafka メッセージを N 件スキップします (1 件のメッセージはデータの 1 行に相当します) 。既定値: 0
  • kafka_commit_every_batch — ブロック全体の書き込み後に一度だけコミットするのではなく、消費して処理した各バッチごとにコミットします。デフォルト: 0
  • kafka_client_id — クライアント識別子。デフォルトでは空です。
  • kafka_poll_timeout_ms — Kafka からの単一の poll に対するタイムアウト。デフォルト: stream_poll_timeout_ms
  • kafka_poll_max_batch_size — 1 回の Kafka poll で取得できるメッセージの最大数。デフォルト: max_block_size
  • kafka_flush_interval_ms — Kafka からデータをフラッシュする際のタイムアウト。デフォルト値: stream_flush_interval_ms
  • kafka_consumer_reschedule_ms — Kafka のストリーム処理が停止している場合 (たとえば、消費できるメッセージがない場合) の再スケジュール間隔です。この設定は、コンシューマーが poll を再試行するまでの待機時間を制御します。kafka_consumers_pool_ttl_ms を超えてはなりません。デフォルト: 500 ミリ秒。
  • kafka_thread_per_consumer — 各コンシューマーに独立したスレッドを割り当てます。有効にすると、各コンシューマーがデータをそれぞれ独立して並列にフラッシュします (無効の場合は、複数のコンシューマーからの行が1つのブロックになるようにまとめられます) 。デフォルト: 0
  • kafka_handle_error_mode — Kafka エンジンでのエラー処理方法。設定可能な値: default (メッセージの解析に失敗した場合に例外がスローされます) 、stream (例外メッセージと生のメッセージが仮想カラム _error_raw_message に保存されます) 、dead_letter_queue (エラー関連のデータが system.dead_letter_queue に保存されます) 。
  • kafka_commit_on_select — SELECTクエリが実行されたときにメッセージをコミットします。デフォルト: false
  • kafka_consumer_acquire_timeout_msKafka2 テーブル (Keeper ベースのオフセットストレージを使用) に対して直接 SELECT クエリを実行する際に、Kafka コンシューマーを取得するまでのタイムアウト時間 (ミリ秒) 。同じテーブルに対して複数の直接 SELECT クエリが同時実行される場合、各クエリはコンシューマーが利用可能になるまで待機する必要があります。このタイムアウトは、クエリがそれぞれ異なるコンシューマーのサブセットを保持している場合にデッドロックを防ぎます。デフォルト: 30000
  • kafka_max_rows_per_message — 行ベースのフォーマットで、1つの Kafka メッセージに書き込める行の最大数です。デフォルト: 1
  • kafka_autodetect_client_rack — 最も近い Kafka レプリカを優先するため、librdkafkaclient.rack パラメータを自動的に設定します。 サポートされるソース: AWS IMDSv2 のアベイラビリティーゾーン ID には AWS_ZONE_ID (例: euc1-az1) ; AWS IMDSv2 のアベイラビリティーゾーン名には AWS_ZONE_NAME (例: eu-central-1a) ; GCP メタデータサービスのゾーンには GCP_ZONE (例: europe-central2-a) ; ClickHouse の内部検出を使用するには CLICKHOUSE。これはクラウドメタデータまたは設定に依存する場合があります; AWS_ZONE_NAME を試し、次に GCP_ZONE を試すには AWS_ZONE_NAME_THEN_GCP_ZONE。 デフォルト: 空文字列 (無効) 。 ヒント: 環境によってアベイラビリティーゾーンのフォーマットは異なります。Amazon MSK は通常ゾーン ID を使用するため、AWS_ZONE_ID を優先してください。Confluent Cloud は通常ゾーン名を使用するため、AWS_ZONE_NAME を優先してください。不明な場合は、AWS_ZONE_NAME_THEN_GCP_ZONE を使用するか、クラスターの broker.rack の値を確認してください。 注: Kafka broker では、broker.rackreplica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector を設定する必要があります。
  • kafka_compression_codec — メッセージの生成に使用される圧縮コーデック。対応: 空文字列、nonegzipsnappylz4zstd。空文字列の場合、この圧縮コーデックはテーブルでは設定されないため、設定ファイルの値、または librdkafka のデフォルト値が使用されます。デフォルト: 空文字列。
  • kafka_compression_levelkafka_compression_codec で選択したアルゴリズムの圧縮レベルパラメーターです。値を大きくすると、CPU 使用量は増えますが、圧縮率は向上します。使用できる範囲はアルゴリズムによって異なります: gzip[0-9]; lz4[0-12]; snappy0 のみ; zstd[0-12]; -1 = コーデック依存のデフォルト圧縮レベル。デフォルト: -1
  • kafka_map_virtual_columns_on_write — 有効にすると、テーブルスキーマ内で _key_timestamp_headers.name_headers.value という特別な名前を持つカラムが、INSERT 時に対応する Kafka メッセージのメタデータにマッピングされ、メッセージのペイロードから除外されます。カラムを Kafka メッセージのメタデータにマッピングするを参照してください。デフォルト: false
例:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue3 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
Kafka table engine は、デフォルト値を持つカラムをサポートしていません。デフォルト値を持つカラムが必要な場合は、materialized view レベルで追加できます (以下を参照) 。

説明

配信済みメッセージは自動的に追跡されるため、グループ内の各メッセージは一度だけカウントされます。データを二重に取得したい場合は、別のグループ名でテーブルのコピーを作成してください。 グループは柔軟で、クラスター全体で同期されます。たとえば、10 個のトピックと、クラスター内に 5 個のテーブルのコピーがある場合、各コピーは 2 個のトピックを受け取ります。コピー数が変わると、トピックは各コピー間で自動的に再配分されます。詳細は http://kafka.apache.org/intro を参照してください。 各 Kafka トピックには専用のコンシューマグループを割り当て、特にトピックが動的に作成・削除される可能性のある環境 (たとえばテスト環境やステージング環境) では、トピックとグループが 1 対 1 で対応するようにすることを推奨します。 メッセージの読み取りに SELECT はあまり適していません (デバッグ用途を除く) 。各メッセージは一度しか読み取れないためです。より実用的なのは、materialized view を使ってリアルタイムのストリームを作成することです。これを行うには、次のようにします。
  1. engine を使用して Kafka コンシューマを作成し、それをデータストリームとして扱います。
  2. 必要な構造を持つテーブルを作成します。
  3. engine からデータを変換し、あらかじめ作成したテーブルに格納する materialized view を作成します。
MATERIALIZED VIEW が engine に接続されると、バックグラウンドでデータの収集を開始します。これにより、Kafka から継続的にメッセージを受信し、SELECT を使って必要なフォーマットに変換できます。 1 つの Kafka テーブルには、必要な数だけ materialized view を持たせることができます。これらは Kafka テーブルから直接データを読み取るのではなく、新しいレコードを (ブロック単位で) 受け取ります。これにより、異なる粒度の複数のテーブル (グループ化あり - aggregation あり、グループ化なし) に書き込むことができます。 例:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() AS total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;
パフォーマンスを向上させるため、受信したメッセージは max_insert_block_size のサイズでブロックにまとめられます。stream_flush_interval_ms ミリ秒以内にブロックが形成されない場合は、ブロックが完全でなくても、データはテーブルにフラッシュされます。 トピック データの受信を停止するか、変換ロジックを変更するには、materialized view をデタッチします:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
ALTER を使用してターゲットテーブルを変更する場合は、ターゲットテーブルとビューのデータの不整合を避けるため、materialized view を無効にすることを推奨します。

設定

GraphiteMergeTree と同様に、Kafka エンジンでは ClickHouse の設定ファイルを使って拡張設定を行えます。使用できる設定キーは 2 種類あり、グローバル (<kafka> 配下) とトピックレベル (<kafka><kafka_topic> 配下) です。最初にグローバル設定が適用され、その後、トピックレベルの設定が存在する場合はそれが適用されます。
  <kafka>
    <!-- Kafka エンジンタイプのすべてのテーブルに対するグローバル設定オプション -->
    <debug>cgrp</debug>
    <statistics_interval_ms>3000</statistics_interval_ms>

    <kafka_topic>
        <name>logs</name>
        <statistics_interval_ms>4000</statistics_interval_ms>
    </kafka_topic>

    <!-- コンシューマーの設定 -->
    <consumer>
        <auto_offset_reset>smallest</auto_offset_reset>
        <kafka_topic>
            <name>logs</name>
            <fetch_min_bytes>100000</fetch_min_bytes>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <fetch_min_bytes>50000</fetch_min_bytes>
        </kafka_topic>
    </consumer>

    <!-- プロデューサーの設定 -->
    <producer>
        <kafka_topic>
            <name>logs</name>
            <retry_backoff_ms>250</retry_backoff_ms>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <retry_backoff_ms>400</retry_backoff_ms>
        </kafka_topic>
    </producer>
  </kafka>
利用可能な設定オプションの一覧については、librdkafka の設定リファレンスを参照してください。ClickHouse の設定では、ドットの代わりにアンダースコア (_) を使用してください。たとえば、check.crcs=true<check_crcs>true</check_crcs> となります。

Kerberos サポート

Kerberos 対応の Kafka を扱うには、値に sasl_plaintext を指定した security_protocol 子要素を追加します。Kerberos のチケット授与チケットが OS の機能によって取得され、キャッシュされていれば十分です。 ClickHouse は、keytab ファイルを使用して Kerberos 認証情報を保持できます。sasl_kerberos_service_namesasl_kerberos_keytabsasl_kerberos_principal の各子要素も指定できます。 例:
<!-- Kerberos対応のKafka -->
<kafka>
  <security_protocol>SASL_PLAINTEXT</security_protocol>
  <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
  <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>

仮想カラム

  • _topic — Kafkaトピック。データ型: LowCardinality(String).
  • _key — メッセージのキー。データ型: String.
  • _offset — メッセージのオフセット。データ型: UInt64.
  • _timestamp — メッセージのタイムスタンプ。データ型: Nullable(DateTime).
  • _timestamp_ms — メッセージのミリ秒単位のタイムスタンプ。データ型: Nullable(DateTime64(3)).
  • _partition — Kafkaトピックのパーティション。データ型: UInt64.
  • _headers.name — メッセージヘッダーのキーの Array。データ型: Array(String).
  • _headers.value — メッセージヘッダーの値の Array。データ型: Array(String).
kafka_handle_error_mode='stream' の場合に追加される仮想カラム:
  • _raw_message - 正常にパースできなかった生のメッセージ。データ型: String.
  • _error - パース失敗時に発生した例外メッセージ。データ型: String.
注: 仮想カラム _raw_message_error に値が入るのは、パース中に例外が発生した場合のみです。メッセージが正常にパースされた場合、これらは常に空です。

Kafka メッセージのメタデータへのカラムのマッピング

INSERT INTO でメッセージを生成するとき、Kafka エンジンは、テーブルにそれらのカラムが存在する場合、常に _key という名前のカラム (型は String) を Kafka メッセージキーとして、_timestamp という名前のカラム (型は DateTime) を Kafka メッセージの timestamp として使用します。デフォルトでは、これらのカラムは生成されるメッセージの payload にも、ほかのカラムとあわせて含まれます。 kafka_map_virtual_columns_on_write = 1 を使用すると、動作は次のように変わります。
  • _key (型 String) — Kafka メッセージキーにマッピングされます。
  • _timestamp (型 DateTime) — Kafka メッセージの timestamp にマッピングされます。
  • _headers.name (型 Array(String)) および _headers.value (型 Array(String)) — Kafka メッセージヘッダーにマッピングされます。各ペア (_headers.name[i], _headers.value[i]) は 1 つの Kafka ヘッダーになります。_headers.name_headers.value_headers という Nested プレフィックスを共有しているため、ClickHouse ではすべての行で両方の配列のサイズが同じでなければなりません。
これらの名前を持つカラムは、型が上記のものと一致する場合にのみ メッセージの payload から除外されます。それ以外の場合は payload に残るため、たまたまこれらの名前を無関係なデータに使っているスキーマでも引き続き動作します。 例:
CREATE TABLE kafka_out
(
    event_json String,
    `_key` String,
    `_timestamp` DateTime,
    `_headers.name` Array(String),
    `_headers.value` Array(String)
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'broker:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'events-producer',
    kafka_format = 'JSONEachRow',
    kafka_map_virtual_columns_on_write = 1;

INSERT INTO kafka_out VALUES
    ('{"a":1}', 'session-42', now(), ['source', 'trace_id'], ['api', 'abc-123']);
生成された Kafka メッセージには、ペイロード {"event_json":"{\"a\":1}"}、キー session-42、現在のタイムスタンプ、および 2 つのヘッダー source=apitrace_id=abc-123 が含まれます。

データフォーマットのサポート

Kafka エンジンは、ClickHouse でサポートされているすべてのフォーマットに対応しています。 1 つの Kafka メッセージに含まれる行数は、そのフォーマットが行ベースかブロックベースかによって異なります。
  • 行ベースのフォーマットでは、1 つの Kafka メッセージに含める行数を kafka_max_rows_per_message の設定で制御できます。
  • ブロックベースのフォーマットでは、ブロックをさらに小さな部分に分割することはできませんが、1 つのブロックに含まれる行数は一般設定の max_block_size で制御できます。

ClickHouse Keeper にコミット済みオフセットを保存するエンジン

allow_experimental_kafka_offsets_storage_in_keeper が有効な場合、Kafka テーブルエンジンにはさらに 2 つの設定を指定できます。
  • kafka_keeper_path は ClickHouse Keeper 内のテーブルへのパスを指定します
  • kafka_replica_name は ClickHouse Keeper 内のレプリカ名を指定します
これらの設定は、両方を指定するか、どちらも指定しないようにする必要があります。両方を指定すると、新しい実験的な Kafka エンジンが使用されます。この新しいエンジンは、コミット済みオフセットを Kafka に保存することに依存せず、ClickHouse Keeper に保存します。引き続き Kafka へのオフセットの commit は試みますが、それらのオフセットに依存するのはテーブルの作成時だけです。それ以外の状況 (テーブルの再起動時や、何らかの error からの復旧後など) では、ClickHouse Keeper に保存されたオフセットが、メッセージ消費を継続するためのオフセットとして使用されます。コミット済みオフセットに加えて、直前のバッチで何件のメッセージを消費したかも保存されるため、insert が失敗した場合でも同じ件数のメッセージが消費され、必要に応じて重複排除を行えます。 例:
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
  kafka_keeper_path = '/clickhouse/{database}/{uuid}',
  kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;

既知の制限事項

この新しいエンジンは実験的機能のため、まだ本番環境での利用には適していません。実装には、いくつか既知の制限があります。
  • テーブルを短時間で削除して再作成したり、同じ ClickHouse Keeper パスを異なるエンジンに指定したりすると、問題が発生する可能性があります。ベストプラクティスとして、パスの衝突を避けるために kafka_keeper_path{uuid} を使用できます。
  • 再現可能な読み取りを実現するには、1 つのスレッドで複数のパーティションからメッセージを消費することはできません。一方で、Kafka コンシューマーを維持するには、定期的にポーリングする必要があります。これら 2 つの要件があるため、複数のコンシューマーを作成できるのは kafka_thread_per_consumer が有効な場合のみにしています。そうでない場合、コンシューマーを定期的にポーリングしながら問題を避けるのが複雑になりすぎるためです。
関連項目
最終更新日 2026年6月10日