Pular para o conteúdo principal
Este motor oferece integração com o ecossistema do Azure Blob Storage, permitindo importar dados em streaming.

Criar tabela

CREATE TABLE test (name String, value UInt32)
    ENGINE = AzureQueue(...)
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    ...
Parâmetros do motor Os parâmetros de AzureQueue são os mesmos compatíveis com o motor de tabela AzureBlobStorage. Consulte a seção de parâmetros aqui. Assim como no motor de tabela AzureBlobStorage, os usuários podem usar o emulador Azurite para desenvolvimento local com o Azure Storage. Mais detalhes aqui. Exemplo
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS mode = 'unordered'

Configurações

O conjunto de configurações compatíveis é praticamente o mesmo do motor de tabela S3Queue, mas sem o prefixo s3queue_. Consulte a lista completa de configurações. Para obter a lista de configurações definidas para a tabela, use a tabela system.azure_queue_settings. Disponível a partir da versão 24.10. Abaixo estão as configurações compatíveis apenas com o AzureQueue e não aplicáveis ao S3Queue.

after_processing_move_connection_string

String de conexão do Azure Blob Storage para a qual os arquivos processados com sucesso serão movidos, caso o destino seja outro contêiner do Azure. Valores possíveis:
  • String.
Valor padrão: string vazia.

after_processing_move_container

Nome do contêiner para o qual os arquivos processados com sucesso serão movidos, caso o destino seja outro contêiner do Azure. Valores possíveis:
  • String.
Valor padrão: string vazia. Exemplo:
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_move_connection_string = 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
    after_processing_move_container = 'dst-container';

SELECT no motor de tabela AzureQueue

Consultas SELECT são proibidas por padrão em tabelas AzureQueue. Isso segue o padrão comum de fila, em que os dados são lidos uma vez e depois removidos da fila. O SELECT é proibido para evitar perda acidental de dados. No entanto, às vezes isso pode ser útil. Para isso, você precisa definir a configuração stream_like_engine_allow_direct_select como True. O motor AzureQueue tem uma configuração especial para consultas SELECT: commit_on_select. Defina-a como False para preservar os dados na fila após a leitura, ou True para removê-los.

Descrição

SELECT não é particularmente útil para importação em streaming (exceto para depuração), porque cada arquivo pode ser importado apenas uma vez. É mais prático criar fluxos em tempo real usando visões materializadas. Para fazer isso:
  1. Use o motor para criar uma tabela para consumir do caminho especificado no S3 e considere-a um fluxo de dados.
  2. Crie uma tabela com a estrutura desejada.
  3. Crie uma visão materializada que converta os dados do motor e os insira em uma tabela criada anteriormente.
Quando a MATERIALIZED VIEW é vinculada ao motor, ela começa a coletar dados em segundo plano. Exemplo:
CREATE TABLE azure_queue_engine_table (key UInt64, data String)
  ENGINE=AzureQueue('<endpoint>', 'CSV', 'gzip')
  SETTINGS
      mode = 'unordered';

CREATE TABLE stats (key UInt64, data String)
  ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO stats
  AS SELECT key, data FROM azure_queue_engine_table;

SELECT * FROM stats ORDER BY key;

Colunas virtuais

  • _path — Caminho do arquivo.
  • _file — Nome do arquivo.
Para mais informações sobre colunas virtuais, veja aqui.

Introspecção

Ative o logging da tabela usando a configuração de tabela enable_logging_to_queue_log=1. Os recursos de introspecção são os mesmos do motor de tabela S3Queue, com algumas diferenças específicas:
  1. Use system.azure_queue_metadata_cache para o estado em memória da fila em versões do servidor >= 25.1. Em versões anteriores, use system.s3queue_metadata_cache (ele também conterá informações para tabelas azure).
  2. Ative system.azure_queue_log por meio da configuração principal do ClickHouse, por exemplo.
  <azure_queue_log>
    <database>system</database>
    <table>azure_queue_log</table>
  </azure_queue_log>
Esta tabela persistente contém as mesmas informações que system.s3queue_metadata_cache, mas para arquivos processados e arquivos com falha. A tabela tem a seguinte estrutura:

CREATE TABLE system.azure_queue_log
(
    `hostname` LowCardinality(String) COMMENT 'Hostname',
    `event_date` Date COMMENT 'Event date of writing this log row',
    `event_time` DateTime COMMENT 'Event time of writing this log row',
    `database` String COMMENT 'The name of a database where current S3Queue table lives.',
    `table` String COMMENT 'The name of S3Queue table.',
    `uuid` String COMMENT 'The UUID of S3Queue table',
    `file_name` String COMMENT 'File name of the processing file',
    `rows_processed` UInt64 COMMENT 'Number of processed rows',
    `status` Enum8('Processed' = 0, 'Failed' = 1) COMMENT 'Status of the processing file',
    `processing_start_time` Nullable(DateTime) COMMENT 'Time of the start of processing the file',
    `processing_end_time` Nullable(DateTime) COMMENT 'Time of the end of processing the file',
    `exception` String COMMENT 'Exception message if happened'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
COMMENT 'Contains logging entries with the information files processes by S3Queue engine.'

Exemplo:
SELECT *
FROM system.azure_queue_log
LIMIT 1
FORMAT Vertical

Row 1:
──────
hostname:              clickhouse
event_date:            2024-12-16
event_time:            2024-12-16 13:42:47
database:              default
table:                 azure_queue_engine_table
uuid:                  1bc52858-00c0-420d-8d03-ac3f189f27c8
file_name:             test_1.csv
rows_processed:        3
status:                Processed
processing_start_time: 2024-12-16 13:42:47
processing_end_time:   2024-12-16 13:42:47
exception:

1 row in set. Elapsed: 0.002 sec.

Última modificação em 10 de junho de 2026