メインコンテンツへスキップ
こちらからプライベートプレビューのウェイトリストに登録できます。
Pub/Sub ClickPipes は、ClickPipes UI を使用して手動でデプロイおよび管理できるほか、OpenAPITerraform を使用してプログラムでデプロイおよび管理することもできます。

前提条件

ClickPipes の概要を一通り確認し、取り込み元とする トピック を含む GCP プロジェクトにアクセスでき、適切な Pub/Sub 権限を持つサービスアカウントを作成している必要があります。ClickPipes に必要な権限の正確な一覧については、Pub/Sub IAM 権限ガイドを参照してください。

最初の ClickPipe を作成する

  1. ClickHouse Cloud サービスの SQL Console にアクセスします。
  1. 左側のメニューで Data Sources ボタンを選択し、「Set up a ClickPipe」をクリックします。
  1. データソースとして GCP Pub/Sub を選択します。
  1. ClickPipe 名、GCP Project ID、および Pub/Sub へのアクセス権が付与されたサービスアカウントの サービスアカウント JSON ファイル を指定して、フォームに入力します。Project ID は 6~30 文字で、小文字、数字、ハイフンを使用でき、英字で始まり、ハイフンで終わることはできません。
  1. 取り込む Pub/Sub トピック を選択します。認証情報の検証が完了すると、GCP プロジェクト内のトピックがドロップダウンに自動的に表示され (アルファベット順にソート) 、選択できるようになります。
    • データフォーマット。 トピック を選択すると、ClickPipes は Pub/Sub スキーマレジストリを照会します。トピック にネイティブの Avro または Protobuf スキーマが関連付けられている場合、Data format と Schema は自動検出され、セレクタはその トピック 上の最新スキーマに固定されます。ネイティブスキーマのない トピック では、既定で JSONEachRow が使用されます。
    • 開始オフセット。 読み取りを開始する位置を選択します。利用可能なオプションは Latest (新しいメッセージのみ) 、Earliest (保持されている最も古いメッセージ) 、Seek to Timestamp (UTC の日時ピッカー付き) です。
    • フィルタ式 (任意) 。 メッセージ属性に対する Pub/Sub の subscription filter です。たとえば attributes.type = "telemetry" のように指定します。フィルタは payload ではなくメッセージ属性にのみ適用され、パイプの作成後に変更することはできません (変更するにはパイプを再作成する必要があります) 。
    • UI には選択した トピック のサンプルメッセージが表示され、Flatten object トグルを使って、ネストされた JSON が宛先側でどのようにフラット化されるかをプレビューできます。
  1. 次のステップでは、新しい ClickHouse table にデータを取り込むか、既存のものを再利用するかを選択できます。画面の指示に従って、table 名、スキーマ、設定を変更してください。上部のサンプル table で、変更内容をリアルタイムにプレビューできます。
用意されているコントロールを使って、詳細設定をカスタマイズすることもできます。
  1. あるいは、既存の ClickHouse table にデータを取り込むこともできます。その場合、UI ではソースのフィールドを、選択した宛先 table の ClickHouse フィールドにマッピングできます。
  1. 最後に、内部 ClickPipes ユーザーの権限を設定できます。
Permissions: ClickPipes は、宛先 table にデータを書き込むための専用ユーザーを作成します。カスタムロール、または事前定義されたロールのいずれかを使用して、この内部ユーザーに割り当てるロールを選択できます。
  • Full access: クラスターへのフルアクセス権です。宛先 table で materialized view や Dictionary を使用する場合に役立つことがあります。
    • Only destination table: 宛先 table に対する INSERT 権限のみです。
  1. 「Complete Setup」をクリックすると、システムに ClickPipe が登録され、概要 table に一覧表示されるようになります。
サマリーテーブルには、ClickHouse 内のソースまたは宛先テーブルのサンプルデータを表示するためのコントロールがあります また、ClickPipe を削除したり、インジェストジョブの概要を表示したりするためのコントロールもあります。
  1. おめでとうございます! 最初の Pub/Sub ClickPipe の設定が正常に完了しました。今後は継続的に実行され、Pub/Sub トピック から ClickHouse Cloud service にデータがリアルタイムで取り込まれます。

管理対象サブスクリプション

Pub/Sub メッセージはトピックから直接ではなく、サブスクリプション経由で消費されます。ClickPipes は各パイプ専用のサブスクリプションを作成・管理するため、ユーザーが選択するのは常にトピックだけです。
  • 管理対象サブスクリプションの名前は clickpipes-{pipeID} で、パイプの起動時にそのトピック上に作成されます。
  • このサブスクリプションには、ack deadline 60 秒、メッセージ保持期間 7 日、メッセージ順序指定 enabled が設定されます。
  • サブスクリプションの作成は冪等です。パイプの再起動やレプリカの再スケジュール時には、設定されたトピックを参照する既存のサブスクリプションがあればそれが再利用されます。
  • トピック discovery とメッセージ sampling の際には、ClickPipes は短命な ephemeral サブスクリプション (clickpipes-discovery-{uuid}) も作成し、sampling が完了するとすぐに削除します。
  • パイプが削除されると、ClickPipes はクリーンアップの一環として管理対象サブスクリプションを削除します。
そのため、指定するサービスアカウントには、サブスクリプションからの消費に加えて、プロジェクト内でサブスクリプションを作成・削除する permission も必要です。完全な一覧については、Pub/Sub IAM 権限ガイドを参照してください。

対応データフォーマット

対応フォーマットは次のとおりです。
  • JSON
  • Avro — Pub/Sub ネイティブスキーマ経由 (BINARY エンコーディング)
  • Protobuf — Pub/Sub ネイティブスキーマ経由 (BINARY エンコーディング)
Avro と Protobuf では、スキーマはトピックに関連付けられた Pub/Sub スキーマレジストリから取得されます。パイプは常にトピックのスキーマの最新リビジョンを使用します。UI のスキーマセレクタは設計上、読み取り専用です。

圧縮

Pub/Sub 向け ClickPipes は、圧縮されたメッセージを自動的に検出して解凍します。Pub/Sub クライアントは生のバイト列をそのまま配信し、解凍は ClickPipes が追加設定なしで処理します。 サポートされている圧縮コーデックは次のとおりです。
  • gzip
  • zstd
  • lz4
  • snappy (framed format)
圧縮は、各メッセージ内のマジックバイトによって自動的に検出されます。既知の圧縮シグネチャが見つからない場合、そのメッセージは非圧縮として扱われます。検出された圧縮タイプはスキーマ推論時にも反映されるため、UI のサンプルデータプレビューには解凍後のペイロードが正しく表示されます。
JSON のようなテキストベースのフォーマットでは、自動検出は安全です。これは、印字可能な ASCII 文字が圧縮のマジックバイトと衝突することがないためです。解凍後のペイロードは 64MB に制限されます。

サポートされているデータ型

標準型のサポート

現在、ClickPipes では以下の ClickHouse データ型がサポートされています。
  • 基本数値型 - [U]Int8/16/32/64、Float32/64、BFloat16
  • 大きな整数型 - [U]Int128/256
  • Decimal 型
  • Boolean
  • String
  • FixedString
  • Date、Date32
  • DateTime、DateTime64 (UTC タイムゾーンのみ)
  • Enum8/Enum16
  • UUID
  • IPv4
  • IPv6
  • すべての ClickHouse LowCardinality 型
  • キーと値に上記いずれかの型を使用する Map (Nullable を含む)
  • 要素に上記いずれかの型を使用する Tuple および Array (Nullable を含む、深さは 1 レベルのみ)
  • SimpleAggregateFunction 型 (AggregatingMergeTree または SummingMergeTree の宛先用)

Variant 型のサポート

ソースデータストリーム内の任意の JSON フィールドに対して、Variant 型 (Variant(String, Int64, DateTime) など) を手動で指定できます。 ClickPipes では使用する適切な Variant のサブタイプを判定する仕組み上、Variant の定義で使用できる整数型または datetime 型は 1 つだけです。たとえば、Variant(Int64, UInt32) はサポートされていません。

JSON 型のサポート

常に JSON オブジェクトである JSON フィールドは、JSON の宛先カラムに割り当てることができます。固定またはスキップするパスを含め、宛先 カラムは目的の JSON 型に手動で変更する必要があります。

Pub/Sub 仮想カラム

以下の仮想カラムは、Pub/Sub トピックでサポートされています。新しい宛先テーブルを作成する際は、Add Column ボタンを使用して仮想カラムを追加できます。
NameDescriptionRecommended Data Type
_message_idブローカーによって割り当てられる Pub/Sub メッセージ IDString
_publish_timePub/Sub の公開 timestamp (ミリ秒精度、UTC)DateTime64(3)
_ordering_keyPub/Sub の ordering key (メッセージに key が設定されていない場合は空文字列)String
_attributesユーザー定義の Pub/Sub メッセージ attributeMap(String, String)
_raw_messagePub/Sub メッセージの完全な payload (デフォルトでは無効)String
_raw_message フィールドは、Pub/Sub メッセージの完全な payload のみが必要な場合に使用できます (たとえば、ClickHouse の JsonExtract* 関数を使用して下流の materialized view を生成する場合など) 。このようなパイプでは、「仮想」ではないカラムをすべて削除することで、ClickPipes のパフォーマンスが向上する場合があります。

制限事項

  • DEFAULT はサポートされていません。
  • 最小の (XS) レプリカサイズで実行している場合、個々のメッセージの上限はデフォルトで 8MB (非圧縮) で、より大きいレプリカでは 16MB (非圧縮) です。この上限を超えるメッセージはエラーとなり拒否されます。より大きなメッセージが必要な場合は、サポートまでお問い合わせください。
  • Pub/Sub サブスクリプションのフィルタは変更できません。フィルタ式を変更するには、パイプを再作成する必要があります。
  • フィルタはメッセージ属性にのみ適用され、メッセージのペイロードには適用されません。

パフォーマンス

バッチ処理

ClickPipes は、データをバッチ単位で ClickHouse に挿入します。これは、データベース内に過剰な数のパーツが作成されるのを防ぎ、クラスターのパフォーマンス低下を避けるためです。 次のいずれかの条件を満たすと、バッチが挿入されます。
  • バッチサイズが上限に達した場合 (100,000 行、またはレプリカのメモリ 1GB あたり 32MB)
  • バッチの保持時間が上限に達した場合 (5 秒)

レイテンシ

レイテンシ (Pub/Sub メッセージが公開されてから、そのメッセージを ClickHouse で利用可能になるまでの時間として定義) は、いくつかの要因 (パブリッシャーのレイテンシ、ネットワークのレイテンシ、メッセージのサイズ/フォーマット) に左右されます。上のセクションで説明したバッチ処理も、レイテンシに影響します。想定されるレイテンシを把握するため、実際のユースケースに即してテストすることを常に推奨しています。 低レイテンシに関する具体的な要件がある場合は、お問い合わせください

順序キー

Pub/Sub は、同じ 順序キー を持つメッセージが、単一のサブスクライバーへ公開順に配信されることを保証します。ClickPipes では、管理対象サブスクリプションでこの順序付けがデフォルトで有効です。メッセージに順序キーが設定されている場合、サブスクライバーはそれらを順番どおりに受信します。順序キーが設定されていない場合の動作は変わりません。 プロデューサーがすべてのメッセージを少数の順序キー (または 1 つのキー) だけで公開している場合、Pub/Sub はそれらのメッセージを少数のサブスクライバーに集約するため、水平方向のスループットが制限されることがあります。順序付けが不要であれば順序キーを付けないか、カーディナリティの高い順序キーを使用することを推奨します。

スケーリング

Pub/Sub 向け ClickPipes は、水平方向・垂直方向の両方にスケールできるよう設計されています。各パイプは単一の管理対象 Pub/Sub サブスクリプションを使用し、これは設定できません。デフォルトでは、1 つのコンシューマーがそのサブスクリプションからメッセージを取得します。コンシューマー数は、ClickPipe の作成時、または 設定 -> 高度な設定 -> スケーリング からいつでも増やせます。ClickPipes は、サブスクリプションからのメッセージを実行中のコンシューマーへ自動的に分散するため、追加の協調処理は不要です。 ClickPipes は、アベイラビリティゾーンに分散したアーキテクチャにより高可用性を実現します。これには、少なくとも 2 つのコンシューマーまでスケーリングする必要があります。 実行中のコンシューマー数にかかわらず、耐障害性は設計上確保されています。コンシューマーまたはその基盤となるインフラストラクチャで障害が発生した場合、ClickPipes は自動的にそのコンシューマーを再起動し、メッセージ処理を継続します。

配信セマンティクス

Pub/Sub 向け ClickPipes は at-least-once 配信を提供します。Pub/Sub メッセージが確認応答されるのは、対応する行が ClickHouse に挿入された後 (または不正なレコードの場合は error table に書き込まれた後) のみです。無限に再配信されるのを防ぐため、error table に振り分けられた不正なレコードも含め、処理されたメッセージはすべて確認応答されます。レプリカが挿入後、ack が Pub/Sub に届く前にクラッシュすると、メッセージは ack deadline 経過後に再配信され、再度挿入されるため、下流のコンシューマーは重複を許容する必要があります。exactly-once セマンティクスが必要な場合は、_message_id 仮想カラムを使って下流で重複排除を行ってください (各 Pub/Sub メッセージ ID はトピック内で一意です) 。

認証

Pub/Sub 向け ClickPipes は、サービスアカウントの JSON キーを使用して GCP に認証します。パイプの作成時にキーファイルをアップロードすると、ClickPipes はそのファイルを保存時に暗号化し、実行時にはメッセージの消費と管理対象サブスクリプションのライフサイクル管理に使用します。 必要な IAM 権限の正確な一覧と、推奨されるカスタムロールの定義については、Pub/Sub IAM 権限ガイド を参照してください。
最終更新日 2026年6月10日