RabbitMQ permite:
- Publicar o suscribirse a flujos de datos.
- Procesar flujos a medida que están disponibles.
Crear una tabla
rabbitmq_host_port– host:port (por ejemplo,localhost:5672).rabbitmq_exchange_name– Nombre del exchange de RabbitMQ.rabbitmq_format– Formato del mensaje. Utiliza la misma notación que la función SQLFORMAT, por ejemploJSONEachRow. Para más información, consulta la sección Formatos.
rabbitmq_exchange_type– El tipo de exchange de RabbitMQ:direct,fanout,topic,headers,consistent_hash. Predeterminado:fanout.rabbitmq_routing_key_list– Una lista de claves de enrutamiento separadas por comas.rabbitmq_schema– Parámetro que debe usarse si el formato requiere una definición de esquema. Por ejemplo, Cap’n Proto requiere la ruta al archivo de esquema y el nombre del objeto raízschema.capnp:Message.rabbitmq_num_consumers– El número de consumidores por tabla. Especifique más consumidores si el rendimiento de un consumidor no es suficiente. Predeterminado:1rabbitmq_num_queues– Número total de colas. Aumentar este número puede mejorar significativamente el rendimiento. Predeterminado:1.rabbitmq_queue_base- Especifique un prefijo para los nombres de las colas. Los casos de uso de esta configuración se describen a continuación.rabbitmq_persistent- Si se establece en 1 (true), el modo de entrega de la consulta de inserción se establecerá en 2 (marca los mensajes como ‘persistent’). Predeterminado:0.rabbitmq_skip_broken_messages– Tolerancia del analizador de mensajes de RabbitMQ a mensajes incompatibles con el esquema por bloque. Sirabbitmq_skip_broken_messages = N, el motor omite N mensajes de RabbitMQ que no se pueden analizar (un mensaje equivale a una fila de datos). Predeterminado:0.rabbitmq_max_block_size- Número de filas recopiladas antes de vaciar los datos desde RabbitMQ. Predeterminado: max_insert_block_size.rabbitmq_flush_interval_ms- Timeout para vaciar los datos desde RabbitMQ. Predeterminado: stream_flush_interval_ms.rabbitmq_queue_settings_list- Permite establecer configuraciones de RabbitMQ al crear una cola. Configuraciones disponibles:x-max-length,x-max-length-bytes,x-message-ttl,x-expires,x-priority,x-max-priority,x-overflow,x-dead-letter-exchange,x-queue-type. La configuracióndurablese habilita automáticamente para la cola.rabbitmq_address- Dirección de la conexión. Use esta configuración orabbitmq_host_port.rabbitmq_vhost- vhost de RabbitMQ. Predeterminado:'\'.rabbitmq_queue_consume- Use colas definidas por el usuario y no realice ninguna configuración de RabbitMQ: declaración de exchanges, colas y bindings. Predeterminado:false.rabbitmq_username- Nombre de usuario de RabbitMQ.rabbitmq_password- Contraseña de RabbitMQ.reject_unhandled_messages- Rechaza mensajes (envía un acuse negativo de RabbitMQ) en caso de error. Esta configuración se habilita automáticamente si hay unx-dead-letter-exchangedefinido enrabbitmq_queue_settings_list.rabbitmq_commit_on_select- Hace commit de los mensajes cuando se realiza una consulta SELECT. Predeterminado:false.rabbitmq_max_rows_per_message— El número máximo de filas escritas en un mensaje de RabbitMQ para formatos basados en filas. Predeterminado:1.rabbitmq_empty_queue_backoff_start_ms— Punto inicial de backoff para reprogramar la lectura si la cola de RabbitMQ está vacía.rabbitmq_empty_queue_backoff_end_ms— Punto final de backoff para reprogramar la lectura si la cola de RabbitMQ está vacía.rabbitmq_empty_queue_backoff_step_ms— Paso de backoff para reprogramar la lectura si la cola de RabbitMQ está vacía.rabbitmq_handle_error_mode— Cómo manejar los errores del motor RabbitMQ. Valores posibles: default (se lanzará una excepción si no se puede analizar un mensaje), stream (el mensaje de excepción y el mensaje sin procesar se guardarán en las columnas virtuales_errory_raw_message), dead_letter_queue (los datos relacionados con errores se guardarán en system.dead_letter_queue).
Conexión SSL
rabbitmq_secure = 1 o amqps en la dirección de conexión: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'.
El comportamiento predeterminado de la biblioteca utilizada no comprueba si la conexión TLS creada es lo suficientemente segura. Tanto si el certificado está caducado, autofirmado, ausente o no es válido, la conexión simplemente se permite. Es posible que en el futuro se implemente una comprobación más estricta de los certificados.
También se pueden añadir ajustes de formato junto con la configuración relacionada con RabbitMQ.
Ejemplo:
Descripción
SELECT no es especialmente útil para leer mensajes (salvo para depuración), porque cada mensaje solo puede leerse una vez. Es más práctico crear hilos en tiempo real mediante vistas materializadas. Para ello:
- Use el motor para crear un consumidor de RabbitMQ y considérelo un stream de datos.
- Cree una tabla con la estructura deseada.
- Cree una vista materializada que convierta los datos del motor y los inserte en una tabla creada previamente.
MATERIALIZED VIEW se conecta al motor, empieza a recopilar datos en segundo plano. Esto le permite recibir continuamente mensajes de RabbitMQ y convertirlos al formato requerido mediante SELECT.
Una tabla de RabbitMQ puede tener tantas vistas materializadas como desee.
Los datos pueden encaminarse según rabbitmq_exchange_type y la rabbitmq_routing_key_list especificada.
No puede haber más de un exchange por tabla. Un exchange puede compartirse entre varias tablas; esto permite el enrutamiento a varias tablas al mismo tiempo.
Opciones de tipo de exchange:
direct- El enrutamiento se basa en la coincidencia exacta de keys. Ejemplo de lista de keys de la tabla:key1,key2,key3,key4,key5; la key del mensaje puede ser cualquiera de ellas.fanout- Enrutamiento a todas las tablas (donde el nombre del exchange es el mismo), independientemente de las keys.topic- El enrutamiento se basa en patterns con keys separadas por puntos. Ejemplos:*.logs,records.*.*.2020,*.2018,*.2019,*.2020.headers- El enrutamiento se basa en coincidencias dekey=valuecon la settingx-match=allox-match=any. Ejemplo de lista de keys de la tabla:x-match=all,format=logs,type=report,year=2020.consistent_hash- Los datos se distribuyen uniformemente entre todas las tablas vinculadas (donde el nombre del exchange es el mismo). Tenga en cuenta que este tipo de exchange debe habilitarse con el plugin de RabbitMQ:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
rabbitmq_queue_base puede usarse en los siguientes casos:
- para permitir que distintas tablas compartan colas, de modo que puedan registrarse varios consumidores para las mismas colas, lo que mejora el rendimiento. Si se usan las settings
rabbitmq_num_consumersy/orabbitmq_num_queues, se logra una coincidencia exacta de las colas cuando estos parámetros son iguales. - para poder restaurar la lectura desde determinadas colas duraderas cuando no todos los mensajes se hayan consumido correctamente. Para reanudar el consumption desde una cola específica, establezca su nombre en la setting
rabbitmq_queue_basey no especifiquerabbitmq_num_consumersnirabbitmq_num_queues(el valor predeterminado es 1). Para reanudar el consumption desde todas las colas declaradas para una tabla específica, simplemente especifique las mismas settings:rabbitmq_queue_base,rabbitmq_num_consumers,rabbitmq_num_queues. De forma predeterminada, los nombres de las colas serán únicos para cada tabla. - para reutilizar colas, ya que se declaran como duraderas y no se eliminan automáticamente. (Pueden eliminarse con cualquiera de las herramientas de CLI de RabbitMQ).
rabbitmq_num_consumers y/o rabbitmq_num_queues se especifican junto con rabbitmq_exchange_type, entonces:
- Debe estar habilitado el plugin
rabbitmq-consistent-hash-exchange. - Debe especificarse la property
message_idde los mensajes publicados (única para cada mensaje/Batch).
messageID y la marca republished (true si se publicó más de una vez); se puede acceder a ellos mediante los headers del mensaje.
No use la misma tabla para inserts y vistas materializadas.
Ejemplo:
Columnas virtuales
_exchange_name- Nombre del exchange de RabbitMQ. Tipo de dato:String._channel_id- ChannelID en el que se declaró el consumidor que recibió el mensaje. Tipo de dato:String._delivery_tag- DeliveryTag del mensaje recibido. Su alcance es por canal. Tipo de dato:UInt64._redelivered- Indicadorredelivereddel mensaje. Tipo de dato:UInt8._message_id- messageID del mensaje recibido; no está vacío si se estableció al publicar el mensaje. Tipo de dato:String._timestamp- marca de tiempo del mensaje recibido; no está vacía si se estableció al publicar el mensaje. Tipo de dato:UInt64.
rabbitmq_handle_error_mode='stream':
_raw_message- Mensaje sin procesar que no pudo analizarse correctamente. Tipo de dato:Nullable(String)._error- Mensaje de excepción producido durante un error de análisis. Tipo de dato:Nullable(String).
_raw_message y _error se rellenan solo en caso de excepción durante el análisis; siempre son NULL cuando el mensaje se analiza correctamente.
Consideraciones
DEFAULT, MATERIALIZED, ALIAS) en la definición de la tabla, estas se ignorarán. En su lugar, las columnas se completarán con los valores predeterminados correspondientes a sus tipos.
Compatibilidad con los formatos de datos
- En los formatos basados en filas, el número de filas en un mensaje de RabbitMQ se puede controlar con la configuración
rabbitmq_max_rows_per_message. - En los formatos basados en bloques, no es posible dividir un bloque en partes más pequeñas, pero el número de filas de un bloque se puede controlar con la configuración general max_block_size.