これは、ClickHouse がサポートする公式の Apache Flink Sink Connector です。Flink の AsyncSinkBase と、公式の ClickHouse Java クライアント をベースに構築されています。
このコネクタは Apache Flink の DataStream API をサポートしています。Table API のサポートは、今後のリリースで追加される予定です。
- Java 11 以上 (Flink 1.17 以降) または 17 以上 (Flink 2.0 以降)
- Apache Flink 1.17 以上
このコネクタは、Flink 1.17+ と Flink 2.0+ の両方をサポートするため、2 つのアーティファクトに分かれています。利用する Flink のバージョンに対応するアーティファクトを選択してください。
| Flink バージョン | アーティファクト | ClickHouse Java クライアント バージョン | 必要な Java |
|---|
| 最新 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.1 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
このコネクタは、Flink 1.17.2 より前のバージョンではテストされていません。
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-2.0.0:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-2.0.0" % {{ stable_version }} classifier "all"
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-1.17</artifactId>
<version>{{ stable_version }}</version>
<classifier>all</classifier>
</dependency>
dependencies {
implementation("com.clickhouse.flink:flink-connector-clickhouse-1.17:{{ stable_version }}")
}
libraryDependencies += "com.clickhouse.flink" % "flink-connector-clickhouse-1.17" % {{ stable_version }} classifier "all"
バイナリJARの命名パターンは次のとおりです:
flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar
ここで、
利用可能なリリース済みの JAR ファイルはすべて、Maven Central Repository で確認できます。
ClickHouse に生の CSV データを insert する場合は、次のようになります。
public static void main(String[] args) {
// ClickHouseClient を設定
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
// ElementConverter を作成
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
// sink を作成し、`setClickHouseFormat` でフォーマットを設定
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
// 最後に、DataStream を sink に接続します。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Path csvFilePath = new Path(fileFullName);
FileSource<String> csvSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
.build();
env.fromSource(
csvSource,
WatermarkStrategy.noWatermarks(),
"GzipCsvSource"
).sinkTo(csvSink);
}
その他の例やコードスニペットは、テストコードで確認できます。
ClickHouse Sink をすぐに使い始められるように、Maven ベースのサンプルを用意しています。
詳しい手順については、Example Guide を参照してください。
| Parameters | Description | Default Value | Required |
|---|
url | 完全修飾 ClickHouse URL | 該当なし | はい |
username | ClickHouse データベースのユーザー名 | 該当なし | はい |
password | ClickHouse データベースのパスワード | 該当なし | はい |
database | ClickHouse のデータベース名 | 該当なし | はい |
table | ClickHouse テーブル名 | 該当なし | はい |
options | Java クライアントの設定オプションを格納する map | 空の map | いいえ |
serverSettings | ClickHouse サーバーのセッション設定を格納する map | 空の map | いいえ |
enableJsonSupportAsString | JSON data type に対して JSON 形式の String を受け取ることを想定する ClickHouse サーバー設定 | true | いいえ |
options と serverSettings は、Map<String, String> としてクライアントに渡す必要があります。いずれかに空の map を指定すると、それぞれクライアントまたはサーバーのデフォルト値が使用されます。
たとえば、次のとおりです。
Map<String, String> javaClientOptions = Map.of(
ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);
Map<String, String> serverSettings = Map.of(
"insert_deduplicate", "1"
);
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
url,
username,
password,
database,
tableName,
javaClientOptions,
serverSettings,
false // enableJsonSupportAsString
);
以下のオプションは、Flink の AsyncSinkBase に直接由来しています。
| パラメータ | 説明 | デフォルト値 | 必須 |
|---|
maxBatchSize | 1 回のバッチで挿入されるレコードの最大数 | N/A | はい |
maxInFlightRequests | シンクがバックプレッシャーを適用するまでに許可される、処理中のリクエストの最大数 | N/A | はい |
maxBufferedRequests | バックプレッシャーが適用されるまでに、シンク内でバッファできるレコードの最大数 | N/A | はい |
maxBatchSizeInBytes | バッチの最大サイズ (バイト単位) 。送信されるすべてのバッチは、このサイズ以下になります | N/A | はい |
maxTimeInBufferMS | レコードがフラッシュされるまでにシンク内に保持される最大時間 | N/A | はい |
maxRecordSizeInBytes | シンクが受け入れるレコードの最大サイズ。これを超えるレコードは自動的に拒否されます | N/A | はい |
以下の表は、Flink から ClickHouse にデータを挿入する際のデータ型変換の早見表です。
Flink から ClickHouse にデータを挿入する
| Java 型 | ClickHouse 型 | 対応可否 | シリアライゼーション方式 |
|---|
byte/Byte | Int8 | ✅ | DataWriter.writeInt8 |
short/Short | Int16 | ✅ | DataWriter.writeInt16 |
int/Integer | Int32 | ✅ | DataWriter.writeInt32 |
long/Long | Int64 | ✅ | DataWriter.writeInt64 |
BigInteger | Int128 | ✅ | DataWriter.writeInt128 |
BigInteger | Int256 | ✅ | DataWriter.writeInt256 |
short/Short | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt16 | ✅ | DataWriter.writeUInt16 |
long/Long | UInt32 | ✅ | DataWriter.writeUInt32 |
long/Long | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt128 | ✅ | DataWriter.writeUInt128 |
BigInteger | UInt256 | ✅ | DataWriter.writeUInt256 |
BigDecimal | Decimal | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal32 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal64 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal128 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal256 | ✅ | DataWriter.writeDecimal |
float/Float | Float | ✅ | DataWriter.writeFloat32 |
double/Double | Double | ✅ | DataWriter.writeFloat64 |
boolean/Boolean | Boolean | ✅ | DataWriter.writeBoolean |
String | String | ✅ | DataWriter.writeString |
String | FixedString | ✅ | DataWriter.writeFixedString |
LocalDate | Date | ✅ | DataWriter.writeDate |
LocalDate | Date32 | ✅ | DataWriter.writeDate32 |
LocalDateTime | DateTime | ✅ | DataWriter.writeDateTime |
ZonedDateTime | DateTime | ✅ | DataWriter.writeDateTime |
LocalDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
ZonedDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
int/Integer | Time | ❌ | N/A |
long/Long | Time64 | ❌ | N/A |
byte/Byte | Enum8 | ✅ | DataWriter.writeInt8 |
int/Integer | Enum16 | ✅ | DataWriter.writeInt16 |
java.util.UUID | UUID | ✅ | DataWriter.writeIntUUID |
String | JSON | ✅ | DataWriter.writeJSON |
Array<Type> | Array<Type> | ✅ | DataWriter.writeArray |
Map<K,V> | Map<K,V> | ✅ | DataWriter.writeMap |
Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | DataWriter.writeTuple |
Object | Variant | ❌ | N/A |
注記:
- 日付操作を行う際は、
ZoneId を指定する必要があります。
- 10 進数の操作を行う際は、精度とスケールを指定する必要があります。
- ClickHouse が Java の
String を JSON として parse できるようにするには、ClickHouseClientConfig で enableJsonSupportAsString を有効にする必要があります。
- コネクタでは、入力 DataStream 内の要素を ClickHouse の ペイロード にマッピングするために
ElementConvertor が必要です。そのため、コネクタには ClickHouseConvertor と POJOConvertor が用意されており、これらを使用して、前述の DataWriter のシリアライゼーションメソッドでこのマッピングを実装できます。
利用可能な ClickHouse の入力フォーマットの一覧は、このドキュメントページ と ClickHouseFormat.java で確認できます。
コネクタが DataStream を ClickHouse のペイロードにシリアライズする際に使用するフォーマットを指定するには、setClickHouseFormat 関数を使用します。たとえば、次のように指定します。
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
このコネクタは、Flink の既存のメトリクスに加えて、以下の追加メトリクスも公開します。
| Metric | Description | Type | Status |
|---|
numBytesSend | リクエストのペイロードで ClickHouse に送信された合計バイト数。注: このメトリクスは、ネットワーク経由で送信されたシリアライズ済みデータのサイズを測定するものです。そのため、処理後にストレージへ実際に書き込まれたバイト数を反映する system.query_log 内の ClickHouse の written_bytes とは異なる場合があります | カウンター | ✅ |
numRecordSend | ClickHouse に送信されたレコードの合計数 | カウンター | ✅ |
numRequestSubmitted | 送信されたリクエストの合計数 (実際に実行されたフラッシュ回数) | カウンター | ✅ |
numOfDroppedBatches | 再試行できない障害により破棄されたバッチの合計数 | カウンター | ✅ |
numOfDroppedRecords | 再試行できない障害により破棄されたレコードの合計数 | カウンター | ✅ |
totalBatchRetries | 再試行可能な障害によるバッチの再試行の合計回数 | カウンター | ✅ |
writeLatencyHistogram | 書き込み成功時のレイテンシ分布のヒストグラム (ms) | ヒストグラム | ✅ |
writeFailureLatencyHistogram | 書き込み失敗時のレイテンシ分布のヒストグラム (ms) | ヒストグラム | ✅ |
triggeredByMaxBatchSizeCounter | maxBatchSize に達したことでトリガーされたフラッシュの合計回数 | カウンター | ✅ |
triggeredByMaxBatchSizeInBytesCounter | maxBatchSizeInBytes に達したことでトリガーされたフラッシュの合計回数 | カウンター | ✅ |
triggeredByMaxTimeInBufferMSCounter | maxTimeInBufferMS に達したことでトリガーされたフラッシュの合計回数 | カウンター | ✅ |
actualRecordsPerBatch | 実際のバッチサイズ分布のヒストグラム | ヒストグラム | ✅ |
actualBytesPerBatch | 実際のバッチごとのバイト数分布のヒストグラム | ヒストグラム | ✅ |
- このシンクは現在、少なくとも 1 回の配信を保証します。exactly-once セマンティクスへの対応状況はこちらで追跡されています。
- このシンクはまだ、処理できないレコードをバッファリングするための dead-letter キュー (DLQ) をサポートしていません。それまでの間、コネクタは失敗したレコードの再挿入を試み、成功しない場合はそれらを破棄します。この機能の対応状況はこちらで追跡されています。
- このシンクはまだ、Flink の Table API または Flink SQL 経由での作成をサポートしていません。この機能の対応状況はこちらで追跡されています。
ClickHouse バージョンの互換性とセキュリティ
- このコネクタは、最新版や head を含む最近の複数の ClickHouse バージョンに対して、日次の CI ワークフローでテストされています。テスト対象のバージョンは、新しい ClickHouse リリースが有効になるたびに定期的に更新されます。コネクタが日次でテストしているバージョンについては、こちらを参照してください。
- 既知のセキュリティ脆弱性や脆弱性の報告方法については、ClickHouse security policyを参照してください。
- セキュリティ修正や新機能の改善を取りこぼさないよう、コネクタは継続的にアップグレードすることを推奨します。
- 移行で問題が発生した場合は、GitHub の issue を作成してください。こちらで対応します。
- 最適なパフォーマンスを得るには、DataStream の element type が Generic 型ではないことを確認してください。詳しくは、Flink の型の区別についてはこちらを参照してください。Generic ではない要素を使用すると、Kryo によるシリアライゼーションのオーバーヘッドを回避でき、ClickHouse へのスループットが向上します。
maxBatchSize は少なくとも 1000、理想的には 10,000〜100,000 に設定することを推奨します。詳しくは、一括 insert に関するガイドを参照してください。
- ClickHouse に対して OLTP スタイルの 重複排除 や upsert を行う場合は、こちらのドキュメントを参照してください。注: これは、再試行時に発生するバッチ単位の 重複排除 とは異なります。
次のようなエラーが発生することがあります。
com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)
原因: CANNOT_READ_ALL_DATA エラーの最も一般的な原因は、ClickHouse テーブルのスキーマと Flink レコードのスキーマが一致しなくなっていることです。これは、どちらか一方、または両方に後方互換性のない変更が加えられた場合に発生することがあります。
解決策: 互換性が保たれるように、ClickHouse テーブルまたはコネクタの入力データ型、あるいはその両方のスキーマを更新してください。必要に応じて、Java 型を ClickHouse の型にどのように対応付けるかについては、型マッピングを参照してください。注: まだ処理中のレコードがある場合は、コネクタの再起動時に Flink のステートをリセットする必要があります。
ClickHouse への書き込み時に、コネクタのスループットがジョブの並列度 (Flink の task 数) に応じて伸びないことがあります。
原因: ClickHouse のバックグラウンドのパーツマージ処理によって、挿入が遅くなっている可能性があります。これは、設定された バッチ サイズが小さすぎる場合、コネクタの flush 頻度が高すぎる場合、またはその両方が重なった場合に発生することがあります。
解決策: numRequestSubmitted と actualRecordsPerBatch のメトリクスを監視し、バッチ サイズ (maxBatchSize) や flush の頻度をどのように調整すべきか判断してください。あわせて、バッチ サイズに関する推奨事項については 高度な使用方法と推奨事項 も参照してください。
ClickHouse テーブルの行が不足しています
原因: バッチが、再試行不可能な障害により破棄されたか、設定された再試行回数内に挿入できませんでした (ClickHouseClientConfig.setNumberOfRetries() で設定可能) 。注: デフォルトでは、コネクタはバッチを破棄する前に最大 3 回まで再挿入を試みます。
解決策: 根本原因を特定するため、TaskManager のログやスタックトレースを確認してください。
このプロジェクトへのコントリビューションや問題の報告をご希望の場合は、ぜひお知らせください。
issue の作成、改善提案、またはプルリクエストの送信は、GitHubリポジトリから行えます。
コントリビューションを歓迎します。開始する前に、リポジトリ内のコントリビューションガイドをご確認ください。
ClickHouse Flink connector の改善へのご協力ありがとうございます。