メインコンテンツへスキップ
Kinesis ClickPipes は、ClickPipes UI を使用して手動でデプロイおよび管理できるほか、OpenAPITerraform を使用してプログラムによってデプロイおよび管理することもできます。

前提条件

あらかじめ ClickPipes の概要を確認し、IAM 認証情報または IAM ロールを設定しておいてください。ClickHouse Cloud で使用するロールの設定方法については、Kinesis のロールベースアクセスガイドを参照してください。

最初の ClickPipe を作成する

  1. ClickHouse Cloud サービスの SQL Console にアクセスします。
  1. 左側のメニューで Data Sources ボタンを選択し、“Set up a ClickPipe” をクリックします。
  1. データソースを選択します。
  1. ClickPipe の名前、説明 (任意) 、IAM ロールまたは認証情報、そのほかの接続情報を入力します。
  1. Kinesis Stream と開始 OFFSET を選択します。UI には、選択したソース (Kafka topic など) のサンプルドキュメントが表示されます。また、Kinesis streams では Enhanced Fan-out を有効にして、ClickPipe のパフォーマンスと安定性を向上させることもできます (Enhanced Fan-out の詳細はこちらをご覧ください) 。
  1. 次のステップでは、データを新しい ClickHouse table に取り込むか、既存のものを再利用するかを選択できます。画面の指示に従って、table 名、スキーマ、設定を変更してください。上部のサンプルテーブルで、変更内容をリアルタイムにプレビューできます。
用意されているコントロールを使って詳細設定をカスタマイズすることもできます。
  1. または、既存の ClickHouse table にデータを取り込むこともできます。その場合、UI ではソースのフィールドを、選択した宛先テーブルの ClickHouse フィールドにマッピングできます。
  1. 最後に、内部 ClickPipes user の権限を設定できます。
Permissions: ClickPipes は、宛先テーブルにデータを書き込むための専用 user を作成します。カスタムロールまたは事前定義されたロールを使用して、この内部 user に割り当てるロールを選択できます。
  • Full access: クラスターへのフルアクセスを付与します。宛先テーブルで materialized view や Dictionary を使用する場合に役立つことがあります。
    • Only destination table: 宛先テーブルに対する INSERT 権限のみを付与します。
  1. “Complete Setup” をクリックすると、システムに ClickPipe が登録され、概要テーブルに一覧表示されます。
概要テーブルには、ソースまたは ClickHouse 内の宛先テーブルのサンプルデータを表示するコントロールがあります。 また、ClickPipe を削除したり、インジェスト job の概要を表示したりするためのコントロールもあります。
  1. おめでとうございます! 最初の ClickPipe のセットアップが完了しました。これが streaming ClickPipe の場合は継続的に稼働し、リモートのデータソースからデータをリアルタイムで取り込み続けます。そうでない場合は、Batch を取り込んで完了します。

対応データフォーマット

対応フォーマットは以下のとおりです。

圧縮

ClickPipes for Kinesis は、圧縮されたレコードを自動的に検出して解凍します。Kafka ではクライアントライブラリが透過的に解凍を処理しますが、Kinesis は生のバイト列をそのまま配信するため、これは ClickPipes が設定不要で処理します。 サポートされている圧縮コーデックは次のとおりです。
  • gzip
  • zstd
  • lz4
  • snappy (フレーム形式)
圧縮は、各レコード内のマジックバイトによって自動的に検出されます。既知の圧縮シグネチャが見つからない場合、そのレコードは非圧縮として扱われます。検出された圧縮タイプはスキーマ推論時にも表示されるため、UI のサンプルデータプレビューでも解凍後のデータが正しく表示されます。
この自動検出は、JSON や CSV のようなテキストベースのフォーマットでも安全です。印字可能な ASCII 文字が圧縮のマジックバイトと衝突することはないためです。

サポートされているデータ型

標準型のサポート

現在、ClickPipes では以下の ClickHouse データ型がサポートされています。
  • 基本的な数値型 - [U]Int8/16/32/64、Float32/64、BFloat16
  • 大きな整数型 - [U]Int128/256
  • Decimal 型
  • Boolean
  • String
  • FixedString
  • Date、Date32
  • DateTime、DateTime64 (UTC タイムゾーンのみ)
  • Enum8/Enum16
  • UUID
  • IPv4
  • IPv6
  • すべての ClickHouse LowCardinality 型
  • 上記のいずれかの型をキーと値に使用する Map (Nullable を含む)
  • 上記のいずれかの型を要素に使用する Tuple および Array (Nullable を含む、深さは 1 レベルのみ)
  • SimpleAggregateFunction 型 (AggregatingMergeTree または SummingMergeTree の宛先向け)

Variant 型のサポート

ソースデータストリーム内の任意の JSON フィールドに対して、Variant 型 (Variant(String, Int64, DateTime) など) を手動で指定できます。 ClickPipes では使用する適切な Variant 型を判定する仕組み上、Variant の定義で使用できる整数型または datetime 型は 1 つだけです。たとえば、Variant(Int64, UInt32) はサポートされていません。

JSON 型のサポート

常に JSON オブジェクトである JSON フィールドは、JSON の宛先カラムに割り当てることができます。固定パスやスキップするパスも含め、宛先カラムを目的の JSON 型に手動で変更する必要があります。

Kinesis 仮想カラム

Kinesis ストリームでは、以下の仮想カラムをサポートしています。新しい宛先テーブルを作成する際は、Add Column ボタンを使って仮想カラムを追加できます。
NameDescriptionRecommended Data Type
_keyKinesis Partition KeyString
_timestampKinesis のおおよその到着タイムスタンプ (ミリ秒精度)DateTime64(3)
_streamKinesis Stream NameString
_sequence_numberKinesis Sequence NumberString
_raw_messageKinesis メッセージ全体String
_raw_message フィールドは、Kinesis の JSON レコード全体だけが必要な場合に使用できます (たとえば、ClickHouse の JsonExtract* 関数を使用して、下流の materialized view ビューにデータを投入する場合) 。このようなパイプでは、「仮想カラムではない」カラムをすべて削除することで、ClickPipes のパフォーマンスが向上する場合があります。

制限事項

  • DEFAULT はサポートされていません。
  • 最小の (XS) レプリカサイズで実行する場合、個々のメッセージの上限はデフォルトで 8MB (非圧縮) 、より大きいレプリカでは 16MB (非圧縮) です。この上限を超えるメッセージは error となり、拒否されます。より大きいメッセージが必要な場合は、サポートにお問い合わせください。

パフォーマンス

バッチ処理

ClickPipes はデータをバッチ単位で ClickHouse に挿入します。これは、データベース内に過剰な数のパーツが作成されるのを防ぎ、クラスターのパフォーマンス低下を避けるためです。 バッチは、次のいずれかの条件を満たした時点で挿入されます。
  • バッチサイズが上限に達した場合 (100,000 行、またはレプリカメモリ 1GB あたり 32MB)
  • バッチの保持時間が上限に達した場合 (5 秒)

レイテンシ

レイテンシ (Kinesis のメッセージがストリームに送信されてから、そのメッセージを ClickHouse で利用できるようになるまでの時間) は、さまざまな要因 (例: Kinesis のレイテンシ、ネットワークレイテンシ、メッセージのサイズ/フォーマット) に左右されます。上のセクションで説明したバッチ処理も、レイテンシに影響します。実際にどの程度のレイテンシになるかを把握するため、必ずご利用のユースケースに即してテストすることをお勧めします。 低レイテンシに関する特定の要件がある場合は、お問い合わせください

アクティブな分片

同時にアクティブにする分片数は、必要なスループットに合わせて制限することを強く推奨します。“On Demand” の Kinesis ストリームでは、AWS がスループットに応じた数の分片を自動的に割り当てますが、 “Provisioned” ストリームでは、後述のとおり、分片を過剰にプロビジョニングすると遅延の原因になるうえ、Kinesis の料金はこの種のストリームでは「分片ごと」に課金されるため、コストも増加します。 プロデューサーアプリケーションが多数のアクティブな分片に継続的に書き込んでいる場合、各分片を効率よく処理できるほどパイプが十分にスケールしていないと、遅延が発生する可能性があります。Kinesis のスループット制限に基づき、 ClickPipes は分片データを読み取るため、レプリカごとに一定数の “workers” を割り当てます。たとえば最小サイズでは、ClickPipes のレプリカにはこのワーカースレッドが 4 つあります。プロデューサーが 同時に 4 つを超える分片に書き込んでいる場合、ワーカースレッドが空くまで、それを超える分片のデータは処理されません。特に、パイプが “enhanced fanout” を使用している場合、各ワーカースレッドは 5 分間 1 つの分片を購読し、その間はほかの分片を読み取れません。その結果、5 分単位で遅延スパイクが発生することがあります。

スケーリング

ClickPipes for Kinesis は、水平方向と垂直方向の両方にスケールできるよう設計されています。デフォルトでは、1 つのコンシューマーを含むコンシューマグループが作成されます。これは ClickPipe の作成時、または作成後の任意のタイミングで Settings -> Advanced Settings -> Scaling から設定できます。 ClickPipes は、アベイラビリティゾーンに分散されたアーキテクチャによって高可用性を実現します。 そのため、少なくとも 2 つのコンシューマーまでスケールする必要があります。 実行中のコンシューマー数にかかわらず、耐障害性は設計上確保されています。 コンシューマーまたはその基盤となるインフラストラクチャで障害が発生した場合、 ClickPipe は自動的にコンシューマーを再起動し、メッセージの処理を継続します。

認証

Amazon Kinesis ストリームにアクセスするには、IAM 認証情報 または IAM ロール を使用できます。IAM ロールの設定方法の詳細については、ClickHouse Cloud で使用できるロールの設定方法を説明したこちらのガイドを参照してください。
最終更新日 2026年6月10日