- データフローをパブリッシュまたはサブスクライブする。
- 耐障害性のあるストレージを構成する。
- 利用可能になったストリームを処理する。
テーブルの作成
kafka_broker_list— ブローカーをカンマ区切りで指定したリスト (例:localhost:9092) 。kafka_topic_list— Kafkaトピックのリスト。kafka_group_name— Kafkaコンシューマーのグループ。読み取りオフセットは各グループごとに個別に追跡されます。クラスター内でメッセージが重複しないようにするには、すべてで同じグループ名を使用してください。kafka_format— メッセージのフォーマット。JSONEachRowなど、SQL のFORMAT関数と同じ記法を使用します。詳細は フォーマット セクションを参照してください。
kafka_security_protocol- ブローカーと通信する際に使用するプロトコル。設定可能な値:plaintext,ssl,sasl_plaintext,sasl_ssl。kafka_sasl_mechanism- 認証に使用する SASL メカニズム。設定可能な値:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER.kafka_sasl_username-PLAINおよびSASL-SCRAM-..メカニズムで使用する SASL ユーザー名。kafka_sasl_password-PLAINおよびSASL-SCRAM-..メカニズムで使用する SASL パスワード。kafka_schema— フォーマットでスキーマ定義が必要な場合に指定しなければならないパラメータです。たとえば、Cap’n Proto では、スキーマファイルへのパスとルートschema.capnp:Messageオブジェクト名が必要です。kafka_schema_registry_skip_bytes— エンベロープヘッダー付きのスキーマレジストリを使用する際に、各メッセージの先頭からスキップするバイト数です (例: 19 バイトのエンベロープを含む AWS Glue Schema Registry) 。範囲:[0, 255]。デフォルト:0。kafka_num_consumers— テーブルごとのコンシューマー数です。1 つのコンシューマーのスループットでは不十分な場合は、コンシューマー数を増やしてください。各パーティションには割り当てられるコンシューマーが 1 つだけのため、コンシューマーの総数はトピック内のパーティション数を超えてはなりません。また、ClickHouse がデプロイされているサーバーの物理コア数を超えないようにしてください。デフォルト:1。kafka_max_block_size— poll での最大バッチサイズ (メッセージ数) 。デフォルト: max_insert_block_size。kafka_skip_broken_messages— Kafka メッセージパーサーが、ブロックごとにスキーマと互換性のないメッセージをどこまで許容するかを指定します。kafka_skip_broken_messages = Nの場合、エンジンはパースできない Kafka メッセージを N 件スキップします (1 件のメッセージはデータの 1 行に相当します) 。既定値:0。kafka_commit_every_batch— ブロック全体の書き込み後に一度だけコミットするのではなく、消費して処理した各バッチごとにコミットします。デフォルト:0。kafka_client_id— クライアント識別子。デフォルトでは空です。kafka_poll_timeout_ms— Kafka からの単一の poll に対するタイムアウト。デフォルト: stream_poll_timeout_ms。kafka_poll_max_batch_size— 1 回の Kafka poll で取得できるメッセージの最大数。デフォルト: max_block_size。kafka_flush_interval_ms— Kafka からデータをフラッシュする際のタイムアウト。デフォルト値: stream_flush_interval_ms。kafka_consumer_reschedule_ms— Kafka のストリーム処理が停止している場合 (たとえば、消費できるメッセージがない場合) の再スケジュール間隔です。この設定は、コンシューマーが poll を再試行するまでの待機時間を制御します。kafka_consumers_pool_ttl_msを超えてはなりません。デフォルト:500ミリ秒。kafka_thread_per_consumer— 各コンシューマーに独立したスレッドを割り当てます。有効にすると、各コンシューマーがデータをそれぞれ独立して並列にフラッシュします (無効の場合は、複数のコンシューマーからの行が1つのブロックになるようにまとめられます) 。デフォルト:0。kafka_handle_error_mode— Kafka エンジンでのエラー処理方法。設定可能な値: default (メッセージの解析に失敗した場合に例外がスローされます) 、stream (例外メッセージと生のメッセージが仮想カラム_errorと_raw_messageに保存されます) 、dead_letter_queue (エラー関連のデータが system.dead_letter_queue に保存されます) 。kafka_commit_on_select— SELECTクエリが実行されたときにメッセージをコミットします。デフォルト:false。kafka_consumer_acquire_timeout_ms—Kafka2テーブル (Keeper ベースのオフセットストレージを使用) に対して直接SELECTクエリを実行する際に、Kafka コンシューマーを取得するまでのタイムアウト時間 (ミリ秒) 。同じテーブルに対して複数の直接SELECTクエリが同時実行される場合、各クエリはコンシューマーが利用可能になるまで待機する必要があります。このタイムアウトは、クエリがそれぞれ異なるコンシューマーのサブセットを保持している場合にデッドロックを防ぎます。デフォルト:30000。kafka_max_rows_per_message— 行ベースのフォーマットで、1つの Kafka メッセージに書き込める行の最大数です。デフォルト:1。kafka_autodetect_client_rack— 最も近い Kafka レプリカを優先するため、librdkafkaのclient.rackパラメータを自動的に設定します。 サポートされるソース: AWS IMDSv2 のアベイラビリティーゾーン ID にはAWS_ZONE_ID(例:euc1-az1) ; AWS IMDSv2 のアベイラビリティーゾーン名にはAWS_ZONE_NAME(例:eu-central-1a) ; GCP メタデータサービスのゾーンにはGCP_ZONE(例:europe-central2-a) ; ClickHouse の内部検出を使用するにはCLICKHOUSE。これはクラウドメタデータまたは設定に依存する場合があります;AWS_ZONE_NAMEを試し、次にGCP_ZONEを試すにはAWS_ZONE_NAME_THEN_GCP_ZONE。 デフォルト: 空文字列 (無効) 。 ヒント: 環境によってアベイラビリティーゾーンのフォーマットは異なります。Amazon MSK は通常ゾーン ID を使用するため、AWS_ZONE_IDを優先してください。Confluent Cloud は通常ゾーン名を使用するため、AWS_ZONE_NAMEを優先してください。不明な場合は、AWS_ZONE_NAME_THEN_GCP_ZONEを使用するか、クラスターのbroker.rackの値を確認してください。 注: Kafka broker では、broker.rackとreplica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorを設定する必要があります。kafka_compression_codec— メッセージの生成に使用される圧縮コーデック。対応: 空文字列、none、gzip、snappy、lz4、zstd。空文字列の場合、この圧縮コーデックはテーブルでは設定されないため、設定ファイルの値、またはlibrdkafkaのデフォルト値が使用されます。デフォルト: 空文字列。kafka_compression_level—kafka_compression_codecで選択したアルゴリズムの圧縮レベルパラメーターです。値を大きくすると、CPU 使用量は増えますが、圧縮率は向上します。使用できる範囲はアルゴリズムによって異なります:gzipは[0-9];lz4は[0-12];snappyは0のみ;zstdは[0-12];-1= コーデック依存のデフォルト圧縮レベル。デフォルト:-1。kafka_map_virtual_columns_on_write— 有効にすると、テーブルスキーマ内で_key、_timestamp、_headers.name、_headers.valueという特別な名前を持つカラムが、INSERT時に対応する Kafka メッセージのメタデータにマッピングされ、メッセージのペイロードから除外されます。カラムを Kafka メッセージのメタデータにマッピングするを参照してください。デフォルト:false。
Kafka table engine は、デフォルト値を持つカラムをサポートしていません。デフォルト値を持つカラムが必要な場合は、materialized view レベルで追加できます (以下を参照) 。
説明
SELECT はあまり適していません (デバッグ用途を除く) 。各メッセージは一度しか読み取れないためです。より実用的なのは、materialized view を使ってリアルタイムのストリームを作成することです。これを行うには、次のようにします。
- engine を使用して Kafka コンシューマを作成し、それをデータストリームとして扱います。
- 必要な構造を持つテーブルを作成します。
- engine からデータを変換し、あらかじめ作成したテーブルに格納する materialized view を作成します。
MATERIALIZED VIEW が engine に接続されると、バックグラウンドでデータの収集を開始します。これにより、Kafka から継続的にメッセージを受信し、SELECT を使って必要なフォーマットに変換できます。
1 つの Kafka テーブルには、必要な数だけ materialized view を持たせることができます。これらは Kafka テーブルから直接データを読み取るのではなく、新しいレコードを (ブロック単位で) 受け取ります。これにより、異なる粒度の複数のテーブル (グループ化あり - aggregation あり、グループ化なし) に書き込むことができます。
例:
ALTER を使用してターゲットテーブルを変更する場合は、ターゲットテーブルとビューのデータの不整合を避けるため、materialized view を無効にすることを推奨します。
設定
<kafka> 配下) とトピックレベル (<kafka><kafka_topic> 配下) です。最初にグローバル設定が適用され、その後、トピックレベルの設定が存在する場合はそれが適用されます。
_) を使用してください。たとえば、check.crcs=true は <check_crcs>true</check_crcs> となります。
Kerberos サポート
sasl_plaintext を指定した security_protocol 子要素を追加します。Kerberos のチケット授与チケットが OS の機能によって取得され、キャッシュされていれば十分です。
ClickHouse は、keytab ファイルを使用して Kerberos 認証情報を保持できます。sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal の各子要素も指定できます。
例:
仮想カラム
_topic— Kafkaトピック。データ型:LowCardinality(String)._key— メッセージのキー。データ型:String._offset— メッセージのオフセット。データ型:UInt64._timestamp— メッセージのタイムスタンプ。データ型:Nullable(DateTime)._timestamp_ms— メッセージのミリ秒単位のタイムスタンプ。データ型:Nullable(DateTime64(3))._partition— Kafkaトピックのパーティション。データ型:UInt64._headers.name— メッセージヘッダーのキーの Array。データ型:Array(String)._headers.value— メッセージヘッダーの値の Array。データ型:Array(String).
kafka_handle_error_mode='stream' の場合に追加される仮想カラム:
_raw_message- 正常にパースできなかった生のメッセージ。データ型:String._error- パース失敗時に発生した例外メッセージ。データ型:String.
_raw_message と _error に値が入るのは、パース中に例外が発生した場合のみです。メッセージが正常にパースされた場合、これらは常に空です。
Kafka メッセージのメタデータへのカラムのマッピング
INSERT INTO でメッセージを生成するとき、Kafka エンジンは、テーブルにそれらのカラムが存在する場合、常に _key という名前のカラム (型は String) を Kafka メッセージキーとして、_timestamp という名前のカラム (型は DateTime) を Kafka メッセージの timestamp として使用します。デフォルトでは、これらのカラムは生成されるメッセージの payload にも、ほかのカラムとあわせて含まれます。
kafka_map_virtual_columns_on_write = 1 を使用すると、動作は次のように変わります。
_key(型String) — Kafka メッセージキーにマッピングされます。_timestamp(型DateTime) — Kafka メッセージの timestamp にマッピングされます。_headers.name(型Array(String)) および_headers.value(型Array(String)) — Kafka メッセージヘッダーにマッピングされます。各ペア(_headers.name[i], _headers.value[i])は 1 つの Kafka ヘッダーになります。_headers.nameと_headers.valueは_headersという Nested プレフィックスを共有しているため、ClickHouse ではすべての行で両方の配列のサイズが同じでなければなりません。
{"event_json":"{\"a\":1}"}、キー session-42、現在のタイムスタンプ、および 2 つのヘッダー source=api と trace_id=abc-123 が含まれます。
データフォーマットのサポート
- 行ベースのフォーマットでは、1 つの Kafka メッセージに含める行数を
kafka_max_rows_per_messageの設定で制御できます。 - ブロックベースのフォーマットでは、ブロックをさらに小さな部分に分割することはできませんが、1 つのブロックに含まれる行数は一般設定の max_block_size で制御できます。
ClickHouse Keeper にコミット済みオフセットを保存するエンジン
allow_experimental_kafka_offsets_storage_in_keeper が有効な場合、Kafka テーブルエンジンにはさらに 2 つの設定を指定できます。
kafka_keeper_pathは ClickHouse Keeper 内のテーブルへのパスを指定しますkafka_replica_nameは ClickHouse Keeper 内のレプリカ名を指定します
既知の制限事項
- テーブルを短時間で削除して再作成したり、同じ ClickHouse Keeper パスを異なるエンジンに指定したりすると、問題が発生する可能性があります。ベストプラクティスとして、パスの衝突を避けるために
kafka_keeper_pathで{uuid}を使用できます。 - 再現可能な読み取りを実現するには、1 つのスレッドで複数のパーティションからメッセージを消費することはできません。一方で、Kafka コンシューマーを維持するには、定期的にポーリングする必要があります。これら 2 つの要件があるため、複数のコンシューマーを作成できるのは
kafka_thread_per_consumerが有効な場合のみにしています。そうでない場合、コンシューマーを定期的にポーリングしながら問題を避けるのが複雑になりすぎるためです。