메인 콘텐츠로 건너뛰기
이 엔진은 Azure Blob Storage 생태계와 통합되어 스트리밍 데이터 가져오기를 지원합니다.

테이블 생성

CREATE TABLE test (name String, value UInt32)
    ENGINE = AzureQueue(...)
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    ...
엔진 매개변수 AzureQueue의 매개변수는 AzureBlobStorage 테이블 엔진에서 지원하는 매개변수와 동일합니다. 매개변수 섹션은 여기를 참조하십시오. AzureBlobStorage 테이블 엔진과 마찬가지로 로컬 Azure Storage 개발에는 Azurite 에뮬레이터를 사용할 수 있습니다. 자세한 내용은 여기를 참조하십시오. 예시
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS mode = 'unordered'

설정

지원하는 설정은 대부분 S3Queue 테이블 엔진과 동일하지만, s3queue_ 접두사는 사용하지 않습니다. 전체 설정 목록을 참조하십시오. 테이블에 구성된 설정 목록은 system.azure_queue_settings 테이블에서 확인할 수 있습니다. 이 기능은 24.10부터 사용할 수 있습니다. 아래 설정은 AzureQueue에서만 지원되며 S3Queue에는 적용되지 않습니다.

after_processing_move_connection_string

대상이 다른 Azure 컨테이너인 경우, 성공적으로 처리된 파일을 이동할 Azure Blob Storage 연결 문자열입니다. 가능한 값:
  • String.
기본값: 빈 문자열입니다.

after_processing_move_container

대상이 다른 Azure 컨테이너인 경우, 성공적으로 처리한 파일을 이동할 컨테이너 이름입니다. 가능한 값:
  • String.
기본값: 빈 문자열. 예시:
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_move_connection_string = 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
    after_processing_move_container = 'dst-container';

AzureQueue 테이블 엔진에서 SELECT

기본적으로 AzureQueue 테이블에서는 SELECT 쿼리가 허용되지 않습니다. 이는 데이터를 한 번 읽은 뒤 큐에서 제거하는 일반적인 큐 패턴을 따릅니다. SELECT를 금지하는 이유는 실수로 데이터가 손실되는 것을 방지하기 위해서입니다. 하지만 경우에 따라 SELECT가 유용할 수 있습니다. 이렇게 하려면 stream_like_engine_allow_direct_select 설정을 True로 지정해야 합니다. AzureQueue 엔진에는 SELECT 쿼리용 특별 설정인 commit_on_select가 있습니다. 읽은 뒤에도 큐에 데이터를 유지하려면 False로 설정하고, 제거하려면 True로 설정하십시오.

설명

SELECT는 스트리밍 가져오기에는 그다지 유용하지 않습니다(디버깅은 예외). 각 파일은 한 번만 가져올 수 있기 때문입니다. 더 실용적인 방법은 materialized view를 사용해 실시간 스레드를 만드는 것입니다. 이렇게 하려면 다음 단계를 수행하십시오.
  1. 엔진을 사용해 S3의 지정된 경로에서 데이터를 소비하는 테이블을 만들고, 이를 데이터 스트림으로 간주합니다.
  2. 원하는 구조의 테이블을 생성합니다.
  3. 엔진의 데이터를 변환해 앞서 생성한 테이블에 넣는 materialized view를 생성합니다.
MATERIALIZED VIEW가 엔진에 연결되면 백그라운드에서 데이터 수집을 시작합니다. 예시:
CREATE TABLE azure_queue_engine_table (key UInt64, data String)
  ENGINE=AzureQueue('<endpoint>', 'CSV', 'gzip')
  SETTINGS
      mode = 'unordered';

CREATE TABLE stats (key UInt64, data String)
  ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO stats
  AS SELECT key, data FROM azure_queue_engine_table;

SELECT * FROM stats ORDER BY key;

가상 컬럼

  • _path — 파일 경로입니다.
  • _file — 파일 이름입니다.
가상 컬럼에 대한 자세한 내용은 여기를 참조하십시오.

내부 검사

테이블 설정 enable_logging_to_queue_log=1로 해당 테이블의 로깅을 활성화합니다. 내부 검사 기능은 S3Queue 테이블 엔진과 동일하지만, 몇 가지 분명한 차이점이 있습니다:
  1. 서버 버전이 >= 25.1이면 큐의 인메모리 상태에 system.azure_queue_metadata_cache를 사용합니다. 이전 버전에서는 system.s3queue_metadata_cache를 사용합니다(azure 테이블에 대한 정보도 포함됨).
  2. 예를 들어, 기본 ClickHouse 구성에서 system.azure_queue_log를 활성화합니다.
  <azure_queue_log>
    <database>system</database>
    <table>azure_queue_log</table>
  </azure_queue_log>
이 영속 테이블에는 system.s3queue_metadata_cache와 동일한 정보가 저장되지만, 처리된 파일과 실패한 파일에 대한 내용입니다. 이 테이블의 구조는 다음과 같습니다:

CREATE TABLE system.azure_queue_log
(
    `hostname` LowCardinality(String) COMMENT '호스트명',
    `event_date` Date COMMENT '이 로그 행이 기록된 이벤트 날짜',
    `event_time` DateTime COMMENT '이 로그 행이 기록된 이벤트 시간',
    `database` String COMMENT '현재 S3Queue 테이블이 속한 데이터베이스 이름.',
    `table` String COMMENT 'S3Queue 테이블의 이름.',
    `uuid` String COMMENT 'S3Queue 테이블의 UUID',
    `file_name` String COMMENT '처리 중인 파일의 파일명',
    `rows_processed` UInt64 COMMENT '처리된 행 수',
    `status` Enum8('Processed' = 0, 'Failed' = 1) COMMENT '처리 파일의 상태',
    `processing_start_time` Nullable(DateTime) COMMENT '파일 처리 시작 시간',
    `processing_end_time` Nullable(DateTime) COMMENT '파일 처리 종료 시간',
    `exception` String COMMENT '예외 발생 시 예외 메시지'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
COMMENT 'S3Queue 엔진이 처리한 파일 정보를 담은 로깅 항목을 포함합니다.'

예시:
SELECT *
FROM system.azure_queue_log
LIMIT 1
FORMAT Vertical

Row 1:
──────
hostname:              clickhouse
event_date:            2024-12-16
event_time:            2024-12-16 13:42:47
database:              default
table:                 azure_queue_engine_table
uuid:                  1bc52858-00c0-420d-8d03-ac3f189f27c8
file_name:             test_1.csv
rows_processed:        3
status:                Processed
processing_start_time: 2024-12-16 13:42:47
processing_end_time:   2024-12-16 13:42:47
exception:

1 row in set. Elapsed: 0.002 sec.

마지막 수정일 2026년 6월 10일