RabbitMQ permite:
- Publicar ou assinar fluxos de dados.
- Processar streams à medida que ficam disponíveis.
Criando uma tabela
rabbitmq_host_port– host:port (por exemplo,localhost:5672).rabbitmq_exchange_name– nome do exchange do RabbitMQ.rabbitmq_format– Formato da mensagem. Utiliza a mesma notação da função SQLFORMAT, comoJSONEachRow. Para mais informações, consulte a seção Formatos.
rabbitmq_exchange_type– O tipo de exchange do RabbitMQ:direct,fanout,topic,headers,consistent_hash. Padrão:fanout.rabbitmq_routing_key_list– Uma lista de chaves de roteamento separadas por vírgulas.rabbitmq_schema– Parâmetro que deve ser usado se o formato exigir uma definição de schema. Por exemplo, Cap’n Proto requer o caminho para o arquivo de schema e o nome do objeto raizschema.capnp:Message.rabbitmq_num_consumers– O número de consumidores por tabela. Especifique mais consumidores se o throughput de um consumidor for insuficiente. Padrão:1rabbitmq_num_queues– Número total de filas. Aumentar esse número pode melhorar significativamente o desempenho. Padrão:1.rabbitmq_queue_base- Especifique um prefixo para os nomes das filas. Os casos de uso dessa configuração são descritos abaixo.rabbitmq_persistent- Se definido como 1 (true), o modo de entrega da insert query será definido como 2 (marca as mensagens como ‘persistent’). Padrão:0.rabbitmq_skip_broken_messages– Tolerância do parser de mensagens do RabbitMQ a mensagens incompatíveis com o schema por block. Serabbitmq_skip_broken_messages = N, o motor ignora N mensagens do RabbitMQ que não podem ser analisadas (uma mensagem equivale a uma linha de dados). Padrão:0.rabbitmq_max_block_size- Número de linhas coletadas antes do flush dos dados do RabbitMQ. Padrão: max_insert_block_size.rabbitmq_flush_interval_ms- Timeout para flush dos dados do RabbitMQ. Padrão: stream_flush_interval_ms.rabbitmq_queue_settings_list- permite definir configurações do RabbitMQ ao criar uma fila. Configurações disponíveis:x-max-length,x-max-length-bytes,x-message-ttl,x-expires,x-priority,x-max-priority,x-overflow,x-dead-letter-exchange,x-queue-type. A configuraçãodurableé ativada automaticamente para a fila.rabbitmq_address- Endereço para conexão. Use esta configuração ourabbitmq_host_port.rabbitmq_vhost- vhost do RabbitMQ. Padrão:'\'.rabbitmq_queue_consume- Use filas definidas pelo usuário e não faça nenhuma configuração do RabbitMQ: declaração de exchanges, filas e bindings. Padrão:false.rabbitmq_username- Nome de usuário do RabbitMQ.rabbitmq_password- Senha do RabbitMQ.reject_unhandled_messages- Rejeita mensagens (envia confirmação negativa ao RabbitMQ) em caso de erro. Essa configuração é ativada automaticamente se houver umx-dead-letter-exchangedefinido emrabbitmq_queue_settings_list.rabbitmq_commit_on_select- Faz o commit das mensagens quando uma consulta select é executada. Padrão:false.rabbitmq_max_rows_per_message— O número máximo de linhas gravadas em uma mensagem do RabbitMQ para formatos baseados em linhas. Padrão:1.rabbitmq_empty_queue_backoff_start_ms— Ponto inicial de backoff para reprogramar a leitura se a fila do RabbitMQ estiver vazia.rabbitmq_empty_queue_backoff_end_ms— Ponto final de backoff para reprogramar a leitura se a fila do RabbitMQ estiver vazia.rabbitmq_empty_queue_backoff_step_ms— Passo de backoff para reprogramar a leitura se a fila do RabbitMQ estiver vazia.rabbitmq_handle_error_mode— Como tratar erros no motor RabbitMQ. Valores possíveis: default (a exceção será lançada se não conseguirmos analisar uma mensagem), stream (a mensagem de exceção e a mensagem bruta serão salvas nas colunas virtuais_errore_raw_message), dead_letter_queue (os dados relacionados ao erro serão salvos em system.dead_letter_queue).
Conexão SSL
rabbitmq_secure = 1 ou amqps no endereço da conexão: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'.
O comportamento padrão da biblioteca usada é não verificar se a conexão TLS criada é segura o suficiente. Se o certificado estiver expirado, for autoassinado, estiver ausente ou for inválido, a conexão simplesmente é permitida. Verificações mais rigorosas dos certificados poderão ser implementadas no futuro.
Também é possível adicionar configurações de formato junto com as configurações relacionadas ao RabbitMQ.
Exemplo:
Descrição
SELECT não é particularmente útil para ler mensagens (exceto para depuração), porque cada mensagem só pode ser lida uma vez. É mais prático criar fluxos em tempo real usando visões materializadas. Para isso:
- Use o motor para criar um consumidor RabbitMQ e trate-o como um fluxo de dados.
- Crie uma tabela com a estrutura desejada.
- Crie uma visão materializada que converta os dados do motor e os grave em uma tabela criada anteriormente.
MATERIALIZED VIEW é conectada ao motor, ela começa a coletar dados em segundo plano. Isso permite que você receba continuamente mensagens do RabbitMQ e as converta para o formato necessário usando SELECT.
Uma tabela RabbitMQ pode ter quantas visões materializadas você quiser.
Os dados podem ser roteados com base em rabbitmq_exchange_type e na rabbitmq_routing_key_list especificada.
Não pode haver mais de um exchange por tabela. Um exchange pode ser compartilhado entre várias tabelas — isso permite rotear para várias tabelas ao mesmo tempo.
Opções de tipo de exchange:
direct- O roteamento se baseia na correspondência exata das chaves. Exemplo de lista de chaves da tabela:key1,key2,key3,key4,key5; a chave da mensagem pode ser igual a qualquer uma delas.fanout- Roteamento para todas as tabelas (em que o nome do exchange é o mesmo), independentemente das chaves.topic- O roteamento se baseia em padrões com chaves separadas por ponto. Exemplos:*.logs,records.*.*.2020,*.2018,*.2019,*.2020.headers- O roteamento se baseia em correspondênciaskey=valuecom a configuraçãox-match=alloux-match=any. Exemplo de lista de chaves da tabela:x-match=all,format=logs,type=report,year=2020.consistent_hash- Os dados são distribuídos uniformemente entre todas as tabelas vinculadas (em que o nome do exchange é o mesmo). Observe que esse tipo de exchange deve ser habilitado com o plugin do RabbitMQ:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
rabbitmq_queue_base pode ser usada nos seguintes casos:
- para permitir que tabelas diferentes compartilhem filas, de modo que vários consumidores possam ser registrados nas mesmas filas, o que melhora o desempenho. Se você usar as configurações
rabbitmq_num_consumerse/ourabbitmq_num_queues, a correspondência exata das filas será obtida caso esses parâmetros sejam iguais. - para possibilitar a restauração da leitura a partir de determinadas filas duráveis quando nem todas as mensagens tiverem sido consumidas com sucesso. Para retomar o consumo de uma fila específica, defina seu nome na configuração
rabbitmq_queue_basee não especifiquerabbitmq_num_consumersnemrabbitmq_num_queues(o padrão é 1). Para retomar o consumo de todas as filas que foram declaradas para uma tabela específica, basta especificar as mesmas configurações:rabbitmq_queue_base,rabbitmq_num_consumers,rabbitmq_num_queues. Por padrão, os nomes das filas serão exclusivos para as tabelas. - para reutilizar filas, já que elas são declaradas como duráveis e não são excluídas automaticamente. (Podem ser excluídas por qualquer uma das ferramentas de CLI do RabbitMQ.)
rabbitmq_num_consumers e/ou rabbitmq_num_queues forem especificadas junto com rabbitmq_exchange_type, então:
- o plugin
rabbitmq-consistent-hash-exchangedeve estar habilitado. - a propriedade
message_iddas mensagens publicadas deve ser especificada (única para cada mensagem/lote).
messageID e a flag republished (true, se publicada mais de uma vez) — eles podem ser acessados por meio dos cabeçalhos da mensagem.
Não use a mesma tabela para inserts e visões materializadas.
Exemplo:
Colunas virtuais
_exchange_name- Nome do exchange do RabbitMQ. Tipo de dado:String._channel_id- ChannelID no qual o consumidor que recebeu a mensagem foi declarado. Tipo de dado:String._delivery_tag- DeliveryTag da mensagem recebida. Com escopo por canal. Tipo de dado:UInt64._redelivered- Flagredeliveredda mensagem. Tipo de dado:UInt8._message_id- messageID da mensagem recebida; não vazio se tiver sido definido quando a mensagem foi publicada. Tipo de dado:String._timestamp- timestamp da mensagem recebida; não vazio se tiver sido definido quando a mensagem foi publicada. Tipo de dado:UInt64.
rabbitmq_handle_error_mode='stream':
_raw_message- Mensagem bruta que não pôde ser analisada com sucesso. Tipo de dado:Nullable(String)._error- Mensagem de exceção gerada durante uma falha de análise. Tipo de dado:Nullable(String).
_raw_message e _error são preenchidas apenas em caso de exceção durante a análise; elas são sempre NULL quando a mensagem é analisada com sucesso.
Ressalvas
DEFAULT, MATERIALIZED e ALIAS) na definição da tabela, elas serão ignoradas. Em vez disso, as colunas serão preenchidas com os respectivos valores padrão de seus tipos.
Suporte a formatos de dados
- Para formatos baseados em linhas, o número de linhas em uma mensagem do RabbitMQ pode ser controlado pela configuração
rabbitmq_max_rows_per_message. - Para formatos baseados em blocos, não é possível dividir um bloco em partes menores, mas o número de linhas em um bloco pode ser controlado pela configuração geral max_block_size.