- Publique ou inscreva-se em fluxos de dados.
- Organize o armazenamento tolerante a falhas.
- Processe streams à medida que estiverem disponíveis.
Criar uma tabela
kafka_broker_list— Uma lista de brokers separada por vírgulas (por exemplo,localhost:9092).kafka_topic_list— Uma lista de tópicos do Kafka.kafka_group_name— Um grupo de consumidores do Kafka. Os offsets de leitura são acompanhados separadamente para cada grupo. Se você não quiser que as mensagens sejam duplicadas no cluster, use o mesmo nome de grupo em todos os lugares.kafka_format— Formato da mensagem. Usa a mesma notação da função SQLFORMAT, comoJSONEachRow. Para mais informações, consulte a seção Formatos.
kafka_security_protocol- Protocolo usado para se comunicar com os brokers. Valores possíveis:plaintext,ssl,sasl_plaintext,sasl_ssl.kafka_sasl_mechanism- Mecanismo SASL a ser usado na autenticação. Valores possíveis:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER.kafka_sasl_username- nome de usuário SASL para uso com os mecanismosPLAINeSASL-SCRAM-...kafka_sasl_password- senha SASL para uso com os mecanismosPLAINeSASL-SCRAM-...kafka_schema— Parâmetro que deve ser usado se o formato exigir uma definição de esquema. Por exemplo, Cap’n Proto exige o caminho para o arquivo de esquema e o nome do objeto raizschema.capnp:Message.kafka_schema_registry_skip_bytes— O número de bytes a ignorar no início de cada mensagem ao usar schema registry com cabeçalhos de envelope (por exemplo, o AWS Glue Schema Registry, que inclui um envelope de 19 bytes). Intervalo:[0, 255]. Padrão:0.kafka_num_consumers— O número de consumidores por tabela. Especifique mais consumidores se a taxa de transferência de um consumidor for insuficiente. O número total de consumidores não deve exceder o número de partições no tópico, já que apenas um consumidor pode ser atribuído por partição, e não deve ser maior que o número de núcleos físicos no servidor onde o ClickHouse está implantado. Padrão:1.kafka_max_block_size— O tamanho máximo do lote (em mensagens) na operação de poll. Padrão: max_insert_block_size.kafka_skip_broken_messages— tolerância do parser de mensagens do Kafka a mensagens incompatíveis com o schema por bloco. Sekafka_skip_broken_messages = N, então o engine ignora N mensagens do Kafka que não podem ser analisadas (uma mensagem equivale a uma linha de dados). Padrão:0.kafka_commit_every_batch— Faz commit de cada lote consumido e processado, em vez de fazer um único commit após gravar um bloco inteiro. Padrão:0.kafka_client_id— Identificador do cliente. Em branco por padrão.kafka_poll_timeout_ms— Tempo limite para uma única operação de poll do Kafka. Padrão: stream_poll_timeout_ms.kafka_poll_max_batch_size— Número máximo de mensagens obtidas em uma única operação de poll do Kafka. Padrão: max_block_size.kafka_flush_interval_ms— Tempo limite para fazer o flush dos dados do Kafka. Padrão: stream_flush_interval_ms.kafka_consumer_reschedule_ms— Intervalo de reagendamento quando o processamento de streams do Kafka é interrompido (por exemplo, quando não há mensagens disponíveis para consumo). Essa configuração controla o tempo de espera até o consumidor tentar fazer polling novamente. Não deve excederkafka_consumers_pool_ttl_ms. Padrão:500milissegundos.kafka_thread_per_consumer— Fornece uma thread independente para cada consumidor. Quando habilitado, cada consumidor faz o flush dos dados de forma independente, em paralelo (caso contrário, as linhas de vários consumidores são combinadas para formar um bloco). Padrão:0.kafka_handle_error_mode— Como lidar com erros no motor Kafka. Valores possíveis: default (uma exceção será gerada se não conseguirmos interpretar uma mensagem), stream (a mensagem da exceção e a mensagem bruta serão salvas nas colunas virtuais_errore_raw_message), dead_letter_queue (os dados relacionados ao erro serão salvos em system.dead_letter_queue).kafka_commit_on_select— Faz commit das mensagens quando uma consulta SELECT é executada. Padrão:false.kafka_consumer_acquire_timeout_ms— Tempo limite, em milissegundos, para obter um consumidor do Kafka durante consultasSELECTdiretas em uma tabelaKafka2(com armazenamento de offsets baseado em Keeper). Quando várias consultasSELECTdiretas e concorrentes são executadas na mesma tabela, cada uma precisa esperar até que os consumidores fiquem disponíveis. Esse tempo limite evita deadlocks quando as consultas retêm subconjuntos diferentes de consumidores. Padrão:30000.kafka_max_rows_per_message— O número máximo de linhas gravadas em uma mensagem Kafka para formatos baseados em linhas. Padrão:1.kafka_autodetect_client_rack— Configura automaticamente o parâmetroclient.rackdolibrdkafkapara dar preferência às réplicas do Kafka mais próximas. Fontes compatíveis:AWS_ZONE_IDpara o ID da zona de disponibilidade do AWS IMDSv2, por exemploeuc1-az1;AWS_ZONE_NAMEpara o nome da zona de disponibilidade do AWS IMDSv2, por exemploeu-central-1a;GCP_ZONEpara a zona do serviço de metadados do GCP, por exemploeurope-central2-a;CLICKHOUSEpara usar a detecção interna do ClickHouse, que pode depender de metadados da nuvem ou da configuração;AWS_ZONE_NAME_THEN_GCP_ZONEpara tentarAWS_ZONE_NAMEe depoisGCP_ZONE. Padrão: string vazia, desativado. Dica: ambientes diferentes usam formatos diferentes de zona de disponibilidade. O Amazon MSK normalmente usa IDs de zona, portanto prefiraAWS_ZONE_ID. O Confluent Cloud normalmente usa nomes de zona, portanto prefiraAWS_ZONE_NAME. Em caso de dúvida, useAWS_ZONE_NAME_THEN_GCP_ZONEou verifique o valor debroker.rackno seu cluster. Observação: os brokers do Kafka precisam estar configurados combroker.rackereplica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector.kafka_compression_codec— codec de compressão usado para produzir mensagens. Suportados: string vazia,none,gzip,snappy,lz4,zstd. No caso de string vazia, o codec de compressão não é definido pela tabela; portanto, serão usados os valores dos arquivos de configuração ou o valor padrão delibrdkafka. Padrão: string vazia.kafka_compression_level— Parâmetro do nível de compressão para o algoritmo selecionado por kafka_compression_codec. Valores mais altos resultarão em melhor compressão, ao custo de maior uso de CPU. O intervalo utilizável depende do algoritmo:[0-9]paragzip;[0-12]paralz4; apenas0parasnappy;[0-12]parazstd;-1= nível de compressão padrão dependente do codec. Padrão:-1.kafka_map_virtual_columns_on_write— Se ativado, colunas com nomes especiais_key,_timestamp,_headers.namee_headers.valueno esquema da tabela são mapeadas para os metadados correspondentes da mensagem Kafka emINSERTe excluídas do payload da mensagem. Consulte Mapeamento de colunas para metadados de mensagens Kafka. Padrão:false.
O motor de tabela Kafka não oferece suporte a colunas com valor padrão. Se você precisar de colunas com valor padrão, poderá adicioná-las na visão materializada (veja abaixo).
Descrição
SELECT não é particularmente útil para ler mensagens (exceto para depuração), porque cada mensagem só pode ser lida uma vez. É mais prático criar fluxos em tempo real usando visões materializadas. Para fazer isso:
- Use o mecanismo para criar um consumidor Kafka e trate-o como um fluxo de dados.
- Crie uma tabela com a estrutura desejada.
- Crie uma visão materializada que converta os dados do mecanismo e os insira em uma tabela criada anteriormente.
MATERIALIZED VIEW é associada ao mecanismo, ela começa a coletar dados em segundo plano. Isso permite que você receba continuamente mensagens do Kafka e as converta para o formato necessário usando SELECT.
Uma tabela Kafka pode ter quantas visões materializadas você quiser; elas não leem dados da tabela Kafka diretamente, mas recebem novos registros (em blocos). Dessa forma, você pode gravar em várias tabelas com diferentes níveis de detalhamento (com agrupamento - agregação e sem).
Exemplo:
ALTER, recomendamos desativar a visão materializada para evitar discrepâncias entre a tabela de destino e os dados da visão materializada.
Configuração
<kafka>) e no nível do tópico (em <kafka><kafka_topic>). A configuração global é aplicada primeiro e, em seguida, a configuração no nível do tópico é aplicada (se existir).
_) em vez do ponto na configuração do ClickHouse. Por exemplo, check.crcs=true será <check_crcs>true</check_crcs>.
Suporte a Kerberos
security_protocol com o valor sasl_plaintext. Isso é suficiente se o ticket de concessão de tickets do Kerberos for obtido e armazenado em cache pelo sistema operacional.
O ClickHouse consegue manter credenciais do Kerberos usando um arquivo keytab. Considere os elementos filhos sasl_kerberos_service_name, sasl_kerberos_keytab e sasl_kerberos_principal.
Exemplo:
Colunas virtuais
_topic— Tópico do Kafka. Tipo de dado:LowCardinality(String)._key— Chave da mensagem. Tipo de dado:String._offset— Offset da mensagem. Tipo de dado:UInt64._timestamp— Timestamp da mensagem. Tipo de dado:Nullable(DateTime)._timestamp_ms— Timestamp da mensagem em milissegundos. Tipo de dado:Nullable(DateTime64(3))._partition— Partição do tópico do Kafka. Tipo de dado:UInt64._headers.name— Array das chaves dos cabeçalhos da mensagem. Tipo de dado:Array(String)._headers.value— Array dos valores dos cabeçalhos da mensagem. Tipo de dado:Array(String).
kafka_handle_error_mode='stream':
_raw_message- Mensagem bruta que não pôde ser analisada com sucesso. Tipo de dado:String._error- Mensagem de exceção ocorrida durante uma falha na análise. Tipo de dado:String.
_raw_message e _error são preenchidas apenas em caso de exceção durante a análise; elas ficam sempre vazias quando a mensagem é analisada com sucesso.
Mapeamento de colunas para metadados de mensagens do Kafka
INSERT INTO, o motor Kafka sempre usa uma coluna chamada _key (do tipo String) como a chave da mensagem do Kafka e uma coluna chamada _timestamp (do tipo DateTime) como o timestamp da mensagem do Kafka — se essas colunas existirem na tabela. Por padrão, essas colunas também aparecem no payload da mensagem junto com as outras colunas.
Com kafka_map_virtual_columns_on_write = 1, o comportamento muda:
_key(tipoString) — mapeada para a chave da mensagem do Kafka._timestamp(tipoDateTime) — mapeada para o timestamp da mensagem do Kafka._headers.name(tipoArray(String)) e_headers.value(tipoArray(String)) — mapeados para os cabeçalhos da mensagem do Kafka. Cada par(_headers.name[i], _headers.value[i])se torna um cabeçalho do Kafka. Como_headers.namee_headers.valuecompartilham o prefixo Nested_headers, o ClickHouse exige que ambos os arrays tenham o mesmo tamanho em cada linha.
{"event_json":"{\"a\":1}"}, chave session-42, o timestamp atual e dois cabeçalhos source=api e trace_id=abc-123.
Suporte a formatos de dados
- Para formatos baseados em linhas, o número de linhas em uma mensagem do Kafka pode ser controlado pela configuração
kafka_max_rows_per_message. - Para formatos baseados em blocos, não é possível dividir um bloco em partes menores, mas o número de linhas em um bloco pode ser controlado pela configuração geral max_block_size.
Engine para armazenar offsets confirmados no ClickHouse Keeper
allow_experimental_kafka_offsets_storage_in_keeper estiver habilitada, mais duas configurações poderão ser especificadas para o motor de tabela Kafka:
kafka_keeper_pathespecifica o caminho da tabela no ClickHouse Keeperkafka_replica_nameespecifica o nome da réplica no ClickHouse Keeper
Limitações conhecidas
- Excluir e recriar a tabela rapidamente, ou especificar o mesmo caminho do ClickHouse Keeper para motores diferentes, pode causar problemas. Como prática recomendada, você pode usar
{uuid}emkafka_keeper_pathpara evitar conflitos de caminho. - Para garantir leituras repetíveis, as mensagens não podem ser consumidas de várias partições em uma única thread. Por outro lado, os consumidores do Kafka precisam ser consultados regularmente para continuarem ativos. Como resultado dessas duas exigências, decidimos permitir a criação de vários consumidores apenas se
kafka_thread_per_consumerestiver ativado; caso contrário, é muito complicado evitar problemas relacionados à consulta regular dos consumidores.