メインコンテンツへスキップ
入力出力エイリアス

説明

Apache Avro は、効率的なデータ処理のためにバイナリエンコーディングを使用する、行指向のシリアライゼーションフォーマットです。AvroConfluent フォーマットは、Confluent スキーマレジストリ (または API 互換のサービス) を使用した、Avro エンコード済みメッセージの読み書きをサポートします。 各メッセージは Confluent のワイヤ形式に従います。これは、マジックバイト (0x00) に続いて 4 バイトのビッグエンディアンのスキーマ ID が配置され、その後に Avro のバイナリデータが続く形式です。読み取り時には、ClickHouse はレジストリに問い合わせてスキーマ ID を解決します。書き込み時には、ClickHouse は出力カラムから導出したスキーマを登録し、生成された ID を各行の先頭に付加します。最適なパフォーマンスを得るため、スキーマはキャッシュされます。

データ型マッピング

次の表は、Apache Avro フォーマットでサポートされるすべてのデータ型と、INSERT および SELECT クエリでそれぞれに対応する ClickHouse のデータ型を示しています。
Avro data type INSERTClickHouse data typeAvro data type SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes または string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord
** Variant 型null をフィールド値として暗黙的に受け入れるため、たとえば Avro の union(T1, T2, null)Variant(T1, T2) に変換されます。 そのため、ClickHouse から Avro を生成する際には、Avro の union 型の候補に常に null 型を含める必要があります。これは、スキーマ推論の時点では、実際に null となる値があるかどうか分からないためです。 *** Avro 論理型 サポートされていない Avro 論理データ型:
  • time-millis
  • time-micros
  • duration

フォーマット設定

SettingDescriptionDefault
input_format_avro_allow_missing_fieldsフィールドがスキーマ内に見つからない場合に、エラーを発生させる代わりにデフォルト値を使用するかどうか。0
input_format_avro_null_as_defaultnull 値を非 Nullable のカラムに挿入する際に、エラーを発生させる代わりにデフォルト値を使用するかどうか。0
format_avro_schema_registry_urlConfluent スキーマレジストリ の URL。基本認証では、URL エンコードされた認証情報を URL パスに直接含めることができます。
format_avro_schema_registry_connection_timeoutスキーマレジストリ HTTP クライアントの接続タイムアウト (スキーマ取得と登録の両方で使用) の秒数。0 より大きく、600 (10 分) 未満である必要があります。1
format_avro_schema_registry_send_timeoutスキーマレジストリ HTTP クライアントの送信タイムアウトの秒数。0 より大きく、600 (10 分) 未満である必要があります。1
format_avro_schema_registry_receive_timeoutスキーマレジストリ HTTP クライアントの受信タイムアウトの秒数。0 より大きく、600 (10 分) 未満である必要があります。1
output_format_avro_confluent_subject出力用: スキーマレジストリでスキーマを登録する subject 名。書き込み時に必須です。
output_format_avro_string_column_pattern出力用: Avro string としてシリアライズする String カラムの正規表現 (デフォルトは bytes) 。

Kafka からの読み込み

Kafka table engine を使用して Avro でエンコードされた Kafka トピックを読み込むには、format_avro_schema_registry_url 設定でスキーマレジストリの URL を指定します。
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

Kafka への書き込み

AvroConfluent メッセージを Kafka トピック に書き込むには、スキーマレジストリの URL と subject 名の両方を設定します。スキーマは最初の書き込み時に、自動的にレジストリへ登録されます。
CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');

Basic認証を使用する

スキーマレジストリでBasic認証が必要な場合 (例: Confluent Cloud を使用している場合) 、format_avro_schema_registry_url 設定に URL エンコードされた認証情報を指定できます。
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

トラブルシューティング

インジェストの進行状況を監視し、Kafkaコンシューマーのエラーをデバッグするには、system.kafka_consumers システムテーブルをクエリします。デプロイ環境に複数のレプリカがある場合 (例: ClickHouse Cloud) は、clusterAllReplicas テーブル関数を使用する必要があります。
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
スキーマ解決で問題が発生した場合は、kafkacatclickhouse-local を使ってトラブルシューティングできます。
$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c
最終更新日 2026年6月10日