メインコンテンツへスキップ
これは、ClickHouse がサポートする公式の Apache Flink Sink Connector です。Flink の AsyncSinkBase と、公式の ClickHouse Java クライアント をベースに構築されています。 このコネクタは Apache Flink の DataStream API をサポートしています。Table API のサポートは、今後のリリースで追加される予定です

要件

  • Java 11 以上 (Flink 1.17 以降) または 17 以上 (Flink 2.0 以降)
  • Apache Flink 1.17 以上
このコネクタは、Flink 1.17+ と Flink 2.0+ の両方をサポートするため、2 つのアーティファクトに分かれています。利用する Flink のバージョンに対応するアーティファクトを選択してください。
Flink バージョンアーティファクトClickHouse Java クライアント バージョン必要な Java
最新flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+
このコネクタは、Flink 1.17.2 より前のバージョンではテストされていません。

インストールと設定

依存関係として追加する

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

バイナリをダウンロード

バイナリJARの命名パターンは次のとおりです:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
ここで、 利用可能なリリース済みの JAR ファイルはすべて、Maven Central Repository で確認できます。

DataStream API の使用

コードスニペット

ClickHouse に生の CSV データを insert する場合は、次のようになります。
public static void main(String[] args) {
    // ClickHouseClient を設定
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // ElementConverter を作成
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // sink を作成し、`setClickHouseFormat` でフォーマットを設定
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // 最後に、DataStream を sink に接続します。
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}
その他の例やコードスニペットは、テストコードで確認できます。

クイックスタート例

ClickHouse Sink をすぐに使い始められるように、Maven ベースのサンプルを用意しています。 詳しい手順については、Example Guide を参照してください。

DataStream APIの接続オプション

ClickHouse クライアントオプション

ParametersDescriptionDefault ValueRequired
url完全修飾 ClickHouse URL該当なしはい
usernameClickHouse データベースのユーザー名該当なしはい
passwordClickHouse データベースのパスワード該当なしはい
databaseClickHouse のデータベース名該当なしはい
tableClickHouse テーブル名該当なしはい
optionsJava クライアントの設定オプションを格納する map空の mapいいえ
serverSettingsClickHouse サーバーのセッション設定を格納する map空の mapいいえ
enableJsonSupportAsStringJSON data type に対して JSON 形式の String を受け取ることを想定する ClickHouse サーバー設定trueいいえ
optionsserverSettings は、Map<String, String> としてクライアントに渡す必要があります。いずれかに空の map を指定すると、それぞれクライアントまたはサーバーのデフォルト値が使用されます。
利用可能なすべての Java クライアントオプションは、ClientConfigProperties.java および このドキュメントページ に記載されています。利用可能なすべてのサーバーセッション設定は、このドキュメントページ に記載されています。
たとえば、次のとおりです。
Map<String, String> javaClientOptions = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

Map<String, String> serverSettings = Map.of(
    "insert_deduplicate", "1"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    javaClientOptions,
    serverSettings,
    false // enableJsonSupportAsString
);

シンクオプション

以下のオプションは、Flink の AsyncSinkBase に直接由来しています。
パラメータ説明デフォルト値必須
maxBatchSize1 回のバッチで挿入されるレコードの最大数N/Aはい
maxInFlightRequestsシンクがバックプレッシャーを適用するまでに許可される、処理中のリクエストの最大数N/Aはい
maxBufferedRequestsバックプレッシャーが適用されるまでに、シンク内でバッファできるレコードの最大数N/Aはい
maxBatchSizeInBytesバッチの最大サイズ (バイト単位) 。送信されるすべてのバッチは、このサイズ以下になりますN/Aはい
maxTimeInBufferMSレコードがフラッシュされるまでにシンク内に保持される最大時間N/Aはい
maxRecordSizeInBytesシンクが受け入れるレコードの最大サイズ。これを超えるレコードは自動的に拒否されますN/Aはい

サポートされているデータ型

以下の表は、Flink から ClickHouse にデータを挿入する際のデータ型変換の早見表です。
Java 型ClickHouse 型対応可否シリアライゼーション方式
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTimeN/A
long/LongTime64N/A
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariantN/A
注記:
  • 日付操作を行う際は、ZoneId を指定する必要があります。
  • 10 進数の操作を行う際は、精度とスケールを指定する必要があります。
  • ClickHouse が Java の String を JSON として parse できるようにするには、ClickHouseClientConfigenableJsonSupportAsString を有効にする必要があります。
  • コネクタでは、入力 DataStream 内の要素を ClickHouse の ペイロード にマッピングするために ElementConvertor が必要です。そのため、コネクタには ClickHouseConvertorPOJOConvertor が用意されており、これらを使用して、前述の DataWriter のシリアライゼーションメソッドでこのマッピングを実装できます。

サポートされている入力フォーマット

利用可能な ClickHouse の入力フォーマットの一覧は、このドキュメントページClickHouseFormat.java で確認できます。 コネクタが DataStream を ClickHouse のペイロードにシリアライズする際に使用するフォーマットを指定するには、setClickHouseFormat 関数を使用します。たとえば、次のように指定します。
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString,
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS,
        MAX_BUFFERED_REQUESTS,
        MAX_BATCH_SIZE_IN_BYTES,
        MAX_TIME_IN_BUFFER_MS,
        MAX_RECORD_SIZE_IN_BYTES,
        clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
デフォルトでは、ClickHouseClientConfigsetSupportDefault が明示的に true または false に設定されている場合、コネクタはそれぞれ RowBinaryWithDefaults または RowBinary を使用します。

メトリクス

このコネクタは、Flink の既存のメトリクスに加えて、以下の追加メトリクスも公開します。
MetricDescriptionTypeStatus
numBytesSendリクエストのペイロードで ClickHouse に送信された合計バイト数。注: このメトリクスは、ネットワーク経由で送信されたシリアライズ済みデータのサイズを測定するものです。そのため、処理後にストレージへ実際に書き込まれたバイト数を反映する system.query_log 内の ClickHouse の written_bytes とは異なる場合がありますカウンター
numRecordSendClickHouse に送信されたレコードの合計数カウンター
numRequestSubmitted送信されたリクエストの合計数 (実際に実行されたフラッシュ回数)カウンター
numOfDroppedBatches再試行できない障害により破棄されたバッチの合計数カウンター
numOfDroppedRecords再試行できない障害により破棄されたレコードの合計数カウンター
totalBatchRetries再試行可能な障害によるバッチの再試行の合計回数カウンター
writeLatencyHistogram書き込み成功時のレイテンシ分布のヒストグラム (ms)ヒストグラム
writeFailureLatencyHistogram書き込み失敗時のレイテンシ分布のヒストグラム (ms)ヒストグラム
triggeredByMaxBatchSizeCountermaxBatchSize に達したことでトリガーされたフラッシュの合計回数カウンター
triggeredByMaxBatchSizeInBytesCountermaxBatchSizeInBytes に達したことでトリガーされたフラッシュの合計回数カウンター
triggeredByMaxTimeInBufferMSCountermaxTimeInBufferMS に達したことでトリガーされたフラッシュの合計回数カウンター
actualRecordsPerBatch実際のバッチサイズ分布のヒストグラムヒストグラム
actualBytesPerBatch実際のバッチごとのバイト数分布のヒストグラムヒストグラム

制限事項

  • このシンクは現在、少なくとも 1 回の配信を保証します。exactly-once セマンティクスへの対応状況はこちらで追跡されています。
  • このシンクはまだ、処理できないレコードをバッファリングするための dead-letter キュー (DLQ) をサポートしていません。それまでの間、コネクタは失敗したレコードの再挿入を試み、成功しない場合はそれらを破棄します。この機能の対応状況はこちらで追跡されています。
  • このシンクはまだ、Flink の Table API または Flink SQL 経由での作成をサポートしていません。この機能の対応状況はこちらで追跡されています。

ClickHouse バージョンの互換性とセキュリティ

  • このコネクタは、最新版や head を含む最近の複数の ClickHouse バージョンに対して、日次の CI ワークフローでテストされています。テスト対象のバージョンは、新しい ClickHouse リリースが有効になるたびに定期的に更新されます。コネクタが日次でテストしているバージョンについては、こちらを参照してください。
  • 既知のセキュリティ脆弱性や脆弱性の報告方法については、ClickHouse security policyを参照してください。
  • セキュリティ修正や新機能の改善を取りこぼさないよう、コネクタは継続的にアップグレードすることを推奨します。
  • 移行で問題が発生した場合は、GitHub の issue を作成してください。こちらで対応します。
  • 最適なパフォーマンスを得るには、DataStream の element type が Generic 型ではないことを確認してください。詳しくは、Flink の型の区別についてはこちらを参照してください。Generic ではない要素を使用すると、Kryo によるシリアライゼーションのオーバーヘッドを回避でき、ClickHouse へのスループットが向上します。
  • maxBatchSize は少なくとも 1000、理想的には 10,000〜100,000 に設定することを推奨します。詳しくは、一括 insert に関するガイドを参照してください。
  • ClickHouse に対して OLTP スタイルの 重複排除 や upsert を行う場合は、こちらのドキュメントを参照してください。注: これは、再試行時に発生するバッチ単位の 重複排除 とは異なります。

トラブルシューティング

CANNOT_READ_ALL_DATA

次のようなエラーが発生することがあります。
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
原因: CANNOT_READ_ALL_DATA エラーの最も一般的な原因は、ClickHouse テーブルのスキーマと Flink レコードのスキーマが一致しなくなっていることです。これは、どちらか一方、または両方に後方互換性のない変更が加えられた場合に発生することがあります。 解決策: 互換性が保たれるように、ClickHouse テーブルまたはコネクタの入力データ型、あるいはその両方のスキーマを更新してください。必要に応じて、Java 型を ClickHouse の型にどのように対応付けるかについては、型マッピングを参照してください。注: まだ処理中のレコードがある場合は、コネクタの再起動時に Flink のステートをリセットする必要があります。

スループットが低い

ClickHouse への書き込み時に、コネクタのスループットがジョブの並列度 (Flink の task 数) に応じて伸びないことがあります。 原因: ClickHouse のバックグラウンドのパーツマージ処理によって、挿入が遅くなっている可能性があります。これは、設定された バッチ サイズが小さすぎる場合、コネクタの flush 頻度が高すぎる場合、またはその両方が重なった場合に発生することがあります。 解決策: numRequestSubmittedactualRecordsPerBatch のメトリクスを監視し、バッチ サイズ (maxBatchSize) や flush の頻度をどのように調整すべきか判断してください。あわせて、バッチ サイズに関する推奨事項については 高度な使用方法と推奨事項 も参照してください。

ClickHouse テーブルの行が不足しています

原因: バッチが、再試行不可能な障害により破棄されたか、設定された再試行回数内に挿入できませんでした (ClickHouseClientConfig.setNumberOfRetries() で設定可能) 。注: デフォルトでは、コネクタはバッチを破棄する前に最大 3 回まで再挿入を試みます。 解決策: 根本原因を特定するため、TaskManager のログやスタックトレースを確認してください。

コントリビューションとサポート

このプロジェクトへのコントリビューションや問題の報告をご希望の場合は、ぜひお知らせください。 issue の作成、改善提案、またはプルリクエストの送信は、GitHubリポジトリから行えます。 コントリビューションを歓迎します。開始する前に、リポジトリ内のコントリビューションガイドをご確認ください。 ClickHouse Flink connector の改善へのご協力ありがとうございます。
最終更新日 2026年6月10日