メインコンテンツへスキップ
このエンジンは Azure Blob Storage エコシステムと連携し、ストリーミングデータをインポートできます。

テーブルの作成

CREATE TABLE test (name String, value UInt32)
    ENGINE = AzureQueue(...)
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    ...
エンジンパラメータ AzureQueue のパラメータは、AzureBlobStorage テーブルエンジンでサポートされているものと同じです。パラメータについては、こちらを参照してください。 AzureBlobStorage テーブルエンジンと同様に、ローカルで Azure Storage を開発する際には Azurite エミュレータを使用できます。詳細はこちらを参照してください。
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS mode = 'unordered'

設定

サポートされている設定のほとんどは S3Queue テーブルエンジンと同じですが、s3queue_ プレフィックスは付きません。設定の完全な一覧を参照してください。 テーブルに対して設定されている項目の一覧を取得するには、system.azure_queue_settings テーブルを使用します。24.10 から利用できます。 以下は、AzureQueue でのみ使用でき、S3Queue には適用されない設定です。

after_processing_move_connection_string

宛先が別の Azure コンテナーである場合に、正常に処理されたファイルの移動先として使用する Azure Blob Storage の接続文字列。 設定可能な値:
  • String。
デフォルト値: 空文字列。

after_processing_move_container

宛先が別の Azure コンテナーである場合に、正常に処理されたファイルの移動先となるコンテナーの名前。 設定可能な値:
  • String.
デフォルト値: 空文字列。 例:
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_move_connection_string = 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
    after_processing_move_container = 'dst-container';

AzureQueue テーブルエンジン での SELECT

AzureQueue テーブルに対する SELECT クエリは、デフォルトで禁止されています。これは、データは一度読み取るとキューから削除されるという、一般的なキューの動作に従うためです。SELECT が禁止されているのは、意図しないデータ損失を防ぐためです。 ただし、場合によっては直接 SELECT したいこともあります。その場合は、設定 stream_like_engine_allow_direct_selectTrue にする必要があります。 AzureQueue engine には、SELECT クエリ向けの特別な設定 commit_on_select があります。読み取り後もキュー内のデータを保持するには False に設定し、削除するには True に設定します。

説明

SELECT はストリーミングインポートにはあまり適していません (デバッグ用途を除く) 。各ファイルは一度しかインポートできないためです。代わりに、materialized view を使ってリアルタイムの処理パイプラインを作成するほうが実用的です。手順は次のとおりです。
  1. エンジンを使用して、S3 内の指定したパスからデータを読み込むためのテーブルを作成し、それをデータストリームとして扱います。
  2. 必要な構造を持つテーブルを作成します。
  3. エンジンからのデータを変換し、あらかじめ作成したテーブルに書き込む materialized view を作成します。
MATERIALIZED VIEW でそのエンジンを参照すると、バックグラウンドでデータの収集が開始されます。 例:
CREATE TABLE azure_queue_engine_table (key UInt64, data String)
  ENGINE=AzureQueue('<endpoint>', 'CSV', 'gzip')
  SETTINGS
      mode = 'unordered';

CREATE TABLE stats (key UInt64, data String)
  ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO stats
  AS SELECT key, data FROM azure_queue_engine_table;

SELECT * FROM stats ORDER BY key;

仮想カラム

  • _path — ファイルのパス。
  • _file — ファイル名。
仮想カラムの詳細については、こちらを参照してください。

イントロスペクション

テーブル設定 enable_logging_to_queue_log=1 で、このテーブルのログ記録を有効にします。 イントロスペクション機能は S3Queue テーブルエンジン と同じですが、いくつか異なる点があります。
  1. server バージョンが >= 25.1 の場合、queue のインメモリ状態には system.azure_queue_metadata_cache を使用します。古いバージョンでは system.s3queue_metadata_cache を使用します (これには azure テーブルの情報も含まれます) 。
  2. メインの ClickHouse 設定で system.azure_queue_log を有効にします。例:
  <azure_queue_log>
    <database>system</database>
    <table>azure_queue_log</table>
  </azure_queue_log>
この永続テーブルには、system.s3queue_metadata_cache と同じ情報が含まれていますが、対象は処理済みおよび失敗したファイルです。 このテーブルの構造は次のとおりです。

CREATE TABLE system.azure_queue_log
(
    `hostname` LowCardinality(String) COMMENT 'ホスト名',
    `event_date` Date COMMENT 'このログ行が書き込まれたイベントの日付',
    `event_time` DateTime COMMENT 'このログ行が書き込まれたイベントの時刻',
    `database` String COMMENT '現在のS3Queueテーブルが存在するデータベースの名前。',
    `table` String COMMENT 'S3Queueテーブルの名前。',
    `uuid` String COMMENT 'S3QueueテーブルのUUID',
    `file_name` String COMMENT '処理対象ファイルのファイル名',
    `rows_processed` UInt64 COMMENT '処理済みの行数',
    `status` Enum8('Processed' = 0, 'Failed' = 1) COMMENT '処理対象ファイルのステータス',
    `processing_start_time` Nullable(DateTime) COMMENT 'ファイル処理の開始時刻',
    `processing_end_time` Nullable(DateTime) COMMENT 'ファイル処理の終了時刻',
    `exception` String COMMENT '例外が発生した場合の例外メッセージ'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
COMMENT 'S3Queueエンジンによって処理されたファイルの情報を含むログエントリが格納されています。'

例:
SELECT *
FROM system.azure_queue_log
LIMIT 1
FORMAT Vertical

Row 1:
──────
hostname:              clickhouse
event_date:            2024-12-16
event_time:            2024-12-16 13:42:47
database:              default
table:                 azure_queue_engine_table
uuid:                  1bc52858-00c0-420d-8d03-ac3f189f27c8
file_name:             test_1.csv
rows_processed:        3
status:                Processed
processing_start_time: 2024-12-16 13:42:47
processing_end_time:   2024-12-16 13:42:47
exception:

1 row in set. Elapsed: 0.002 sec.

最終更新日 2026年6月10日