前提条件
最初の ClickPipe を作成する
- ClickHouse Cloud サービスの SQL Console にアクセスします。
- 左側のメニューで
Data Sourcesボタンを選択し、“Set up a ClickPipe” をクリックします。
- データソースを選択します。
- ClickPipe の名前、説明 (任意) 、IAM ロールまたは認証情報、そのほかの接続情報を入力します。
- Kinesis Stream と開始 OFFSET を選択します。UI には、選択したソース (Kafka topic など) のサンプルドキュメントが表示されます。また、Kinesis streams では Enhanced Fan-out を有効にして、ClickPipe のパフォーマンスと安定性を向上させることもできます (Enhanced Fan-out の詳細はこちらをご覧ください) 。
- 次のステップでは、データを新しい ClickHouse table に取り込むか、既存のものを再利用するかを選択できます。画面の指示に従って、table 名、スキーマ、設定を変更してください。上部のサンプルテーブルで、変更内容をリアルタイムにプレビューできます。
- または、既存の ClickHouse table にデータを取り込むこともできます。その場合、UI ではソースのフィールドを、選択した宛先テーブルの ClickHouse フィールドにマッピングできます。
- 最後に、内部 ClickPipes user の権限を設定できます。
Full access: クラスターへのフルアクセスを付与します。宛先テーブルで materialized view や Dictionary を使用する場合に役立つことがあります。Only destination table: 宛先テーブルに対するINSERT権限のみを付与します。
- “Complete Setup” をクリックすると、システムに ClickPipe が登録され、概要テーブルに一覧表示されます。
- おめでとうございます! 最初の ClickPipe のセットアップが完了しました。これが streaming ClickPipe の場合は継続的に稼働し、リモートのデータソースからデータをリアルタイムで取り込み続けます。そうでない場合は、Batch を取り込んで完了します。
対応データフォーマット
圧縮
- gzip
- zstd
- lz4
- snappy (フレーム形式)
この自動検出は、JSON や CSV のようなテキストベースのフォーマットでも安全です。印字可能な ASCII 文字が圧縮のマジックバイトと衝突することはないためです。
サポートされているデータ型
標準型のサポート
- 基本的な数値型 - [U]Int8/16/32/64、Float32/64、BFloat16
- 大きな整数型 - [U]Int128/256
- Decimal 型
- Boolean
- String
- FixedString
- Date、Date32
- DateTime、DateTime64 (UTC タイムゾーンのみ)
- Enum8/Enum16
- UUID
- IPv4
- IPv6
- すべての ClickHouse LowCardinality 型
- 上記のいずれかの型をキーと値に使用する Map (Nullable を含む)
- 上記のいずれかの型を要素に使用する Tuple および Array (Nullable を含む、深さは 1 レベルのみ)
- SimpleAggregateFunction 型 (AggregatingMergeTree または SummingMergeTree の宛先向け)
Variant 型のサポート
Variant(String, Int64, DateTime) など) を手動で指定できます。
ClickPipes では使用する適切な Variant 型を判定する仕組み上、Variant の定義で使用できる整数型または datetime
型は 1 つだけです。たとえば、Variant(Int64, UInt32) はサポートされていません。
JSON 型のサポート
Kinesis 仮想カラム
Add Column ボタンを使って仮想カラムを追加できます。
| Name | Description | Recommended Data Type |
|---|---|---|
| _key | Kinesis Partition Key | String |
| _timestamp | Kinesis のおおよその到着タイムスタンプ (ミリ秒精度) | DateTime64(3) |
| _stream | Kinesis Stream Name | String |
| _sequence_number | Kinesis Sequence Number | String |
| _raw_message | Kinesis メッセージ全体 | String |
_raw_message フィールドは、Kinesis の JSON レコード全体だけが必要な場合に使用できます (たとえば、ClickHouse の JsonExtract* 関数を使用して、下流の materialized view
ビューにデータを投入する場合) 。このようなパイプでは、「仮想カラムではない」カラムをすべて削除することで、ClickPipes のパフォーマンスが向上する場合があります。
制限事項
- DEFAULT はサポートされていません。
- 最小の (XS) レプリカサイズで実行する場合、個々のメッセージの上限はデフォルトで 8MB (非圧縮) 、より大きいレプリカでは 16MB (非圧縮) です。この上限を超えるメッセージは error となり、拒否されます。より大きいメッセージが必要な場合は、サポートにお問い合わせください。
パフォーマンス
バッチ処理
- バッチサイズが上限に達した場合 (100,000 行、またはレプリカメモリ 1GB あたり 32MB)
- バッチの保持時間が上限に達した場合 (5 秒)